diff --git a/config.ini b/config.ini index ad54b320c1b016ea1e7802efffa082edb8c83cba..56e4d5750bc7a1eee093f32351c9653b7573cae1 100644 --- a/config.ini +++ b/config.ini @@ -13,6 +13,7 @@ lpmn = lpmn [tool] config = pos_tagger.yaml workers_number = 20 +parallel_subtasks = 10 chunking_limit = 50000 [logging] diff --git a/src/tagger.py b/src/tagger.py index 04b2d7c5907732f05a29f2d55cd5a6955eab3074..e0af179a52aa08ddf44f9cfd007c9add5dfc7668 100644 --- a/src/tagger.py +++ b/src/tagger.py @@ -18,6 +18,7 @@ SubTask.turn_on() DEFAULT_TYPE = "default" OUTPUT = "output" JSON = "json" +LEMMAS = "lemmas" TAGSET = "tagset" TAGGER = "tagger" NER = "ner" @@ -39,7 +40,10 @@ class TaggerWorker(nlp_ws.NLPWorker): _log.info(f"Config taggers from yaml: {cls._taggers}") _log.info(f"Config ners from yaml: {cls._ners}") + cls._parallel_subtasks = config.get('tool').get('parallel_subtasks', 10) cls._chunking_limit = config.get('tool').get('chunking_limit', 50000) + if not isinstance(cls._parallel_subtasks, int): + cls._parallel_subtasks = int(cls._parallel_subtasks) if not isinstance(cls._chunking_limit, int): cls._chunking_limit = int(cls._chunking_limit) _log.info(f"Chunk size: {cls._chunking_limit}") @@ -75,6 +79,7 @@ class TaggerWorker(nlp_ws.NLPWorker): json_text: bool if json output should contain original text (default = True) method: method of processing (default = 'tagger', values: tagger, ner) + parallel_subtasks: number of parallel subtasks (default = 10) :type task_options: dict :param output_path: Path to directory where the @@ -102,6 +107,11 @@ class TaggerWorker(nlp_ws.NLPWorker): json_text = task_options.get("json_text", True) + parallel_subtasks = task_options.get( + "parallel_subtasks", + self._parallel_subtasks + ) + tagger_opt = self._taggers[lang][DEFAULT_TYPE] ner_opt = self._ners[lang][DEFAULT_TYPE] convert_lpmn = self.get_converter_directive( @@ -122,7 +132,7 @@ class TaggerWorker(nlp_ws.NLPWorker): _dir_style = True json_lpmn = [json_lpmn] _log.debug(f"Running LPMN: {json_lpmn}") - if output == JSON and not _dir_style: + if output in [JSON, LEMMAS] and not _dir_style: # split file into chunks chunk_size = int(self._chunking_limit * 0.5) destination_path = os.path.join( @@ -149,7 +159,8 @@ class TaggerWorker(nlp_ws.NLPWorker): destination_path, splitted_corpus, json_lpmn, - _log + _log, + parallel_subtasks ) # remove tmp directory if os.path.exists(destination_path): diff --git a/src/utils.py b/src/utils.py index 05227d747d415d1096c2f6095d93a2e792fd77d9..d098fe5c392328b710c06ce23ac752fe9211a66b 100644 --- a/src/utils.py +++ b/src/utils.py @@ -39,7 +39,7 @@ def _update_last_chunk(tail_data: str, chunk_file_name: str, def merge_splits(output_path: str, destination_path: str, splitted_corpus: List[str], json_lpmn: str, - _log: logging.Logger): + _log: logging.Logger, parallel_subtasks: int = 1): """Merges splitted corpus into one file. :param output_path: path to output file @@ -52,28 +52,47 @@ def merge_splits(output_path: str, destination_path: str, :type json_lpmn: str :param _log: logger :type _log: logging.Logger + :param parallel_subtasks: number of parallel subtasks (default: 1) + :type parallel_subtasks: int """ # remove output file if exists if os.path.isfile(output_path): os.remove(output_path) # create output file + logging.debug(f"Creating output file: {output_path}") with open(output_path, "a") as f2: + logging.debug(f"Created output file: {output_path}") # run tagger on each chunk + subtask_args_queue_awaiting = [] for dbg_i, chunk in enumerate(splitted_corpus): _log.info( - f"Running chunk {dbg_i}: {chunk}" + f"Spawning task {dbg_i}: {chunk}" ) - subtask = SubTask( - os.path.join(destination_path, chunk), - json_lpmn + subtask_args_queue_awaiting.append( + ( + os.path.join(destination_path, chunk), + json_lpmn + ) ) - subtask.run(blocking=False) - l_result = subtask.get_output_path() - _log.debug(f"Result of chunk: {l_result}") - # merge results - with open(l_result, "r") as f: - f2.write(f"{f.read()}\n") + logging.debug(f"Subtask args queue: {subtask_args_queue_awaiting}") + while len(subtask_args_queue_awaiting) > 0: + args = subtask_args_queue_awaiting[:parallel_subtasks] + logging.debug(f"Subtask args: {args}") + subtasks = [ + SubTask(arg_tuple[0], arg_tuple[1]) for arg_tuple in args + ] + for subtask in subtasks: + subtask.run(blocking=False) + + l_results = [subtask.get_output_path() for subtask in subtasks] + logging.debug(f"Multiple results: {l_results}") + + for l_result in l_results: + _log.debug(f"Result of chunk: {l_result}") + with open(l_result, "r") as f: + f2.write(f"{f.read()}\n") + del subtask_args_queue_awaiting[:parallel_subtasks] def split_corpus(source_path: str, destination_path: str, file_name: str, diff --git a/tests/test.py b/tests/test.py index 6edb0253e09783e027ba876a7fac8bbe923f6559..b120603461303768f12c07769a5f737955284a7e 100644 --- a/tests/test.py +++ b/tests/test.py @@ -3,6 +3,7 @@ import shutil from filecmp import cmp import aioprocessing as ap +import queue from nlp_ws import SubTask from src.tagger import TaggerWorker @@ -29,13 +30,22 @@ def get_output_path(self, timeout=0): return tmp_subtask_result_file[dict_key] +def subtaskbase_init_gen(pid): + def subtaskbase_init(self): + self.process_id = pid + self.q_in = queue.Queue() + self.q_out = self._processes[self.process_id]["q_out"] + self.idx = None + + return subtaskbase_init + def test_init(): worker = TaggerWorker() assert type(worker).__name__ == 'TaggerWorker' -def test_base_process_file_en(mocker, worker, input_dir, input_file1, - output_dir, expected_dir): +@pytest.fixture(autouse=True) +def run_around_tests(mocker): mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) @@ -44,6 +54,10 @@ def test_base_process_file_en(mocker, worker, input_dir, input_file1, {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, os.getpid() ) + + +def test_base_process_file_en(mocker, worker, input_dir, input_file1, + output_dir, expected_dir): worker.process( os.path.join(input_dir, input_file1), {"lang": "en"}, os.path.join(output_dir, input_file1) @@ -55,14 +69,6 @@ def test_base_process_file_en(mocker, worker, input_dir, input_file1, def test_base_process_file_small_limit_en(mocker, worker_small, input_dir, input_file_small, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() - ) worker_small.process( os.path.join(input_dir, input_file_small), {"lang": "en"}, os.path.join(output_dir, input_file_small) @@ -72,16 +78,30 @@ def test_base_process_file_small_limit_en(mocker, worker_small, input_dir, input os.remove(os.path.join(output_dir, input_file_small)) -def test_base_process_file_pl(mocker, worker, input_dir, input_file1_pl, +def test_base_process_file_en_parallel(mocker, worker, input_dir, input_file1, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() + worker.process( + os.path.join(input_dir, input_file1), + {"lang": "en", "parallel_subtasks": 4}, os.path.join(output_dir, input_file1) + ) + assert cmp(os.path.join(output_dir, input_file1), + os.path.join(expected_dir, input_file1)) + os.remove(os.path.join(output_dir, input_file1)) + + +def test_base_process_file_small_limit_en_parallel(mocker, worker_small, input_dir, input_file_small, + output_dir, expected_dir): + worker_small.process( + os.path.join(input_dir, input_file_small), + {"lang": "en", "parallel_subtasks": 4}, os.path.join(output_dir, input_file_small) ) + assert cmp(os.path.join(output_dir, input_file_small), + os.path.join(expected_dir, input_file_small)) + os.remove(os.path.join(output_dir, input_file_small)) + + +def test_base_process_file_pl(mocker, worker, input_dir, input_file1_pl, + output_dir, expected_dir): worker.process( os.path.join(input_dir, input_file1_pl), {"lang": "pl"}, os.path.join(output_dir, input_file1_pl) @@ -93,14 +113,6 @@ def test_base_process_file_pl(mocker, worker, input_dir, input_file1_pl, def test_base_process_file_small_limit_pl(mocker, worker_small, input_dir, input_file_small_pl, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() - ) worker_small.process( os.path.join(input_dir, input_file_small_pl), {"lang": "pl"}, os.path.join(output_dir, input_file_small_pl) @@ -110,16 +122,30 @@ def test_base_process_file_small_limit_pl(mocker, worker_small, input_dir, input os.remove(os.path.join(output_dir, input_file_small_pl)) -def test_base_process_file_de(mocker, worker, input_dir, input_file1_de, +def test_base_process_file_pl_parallel(mocker, worker, input_dir, input_file1_pl, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() + worker.process( + os.path.join(input_dir, input_file1_pl), + {"lang": "pl", "parallel_subtasks": 3}, os.path.join(output_dir, input_file1_pl) ) + assert cmp(os.path.join(output_dir, input_file1_pl), + os.path.join(expected_dir, input_file1_pl)) + os.remove(os.path.join(output_dir, input_file1_pl)) + + +def test_base_process_file_small_limit_pl_parallel(mocker, worker_small, input_dir, input_file_small_pl, + output_dir, expected_dir): + worker_small.process( + os.path.join(input_dir, input_file_small_pl), + {"lang": "pl", "parallel_subtasks": 3}, os.path.join(output_dir, input_file_small_pl) + ) + assert cmp(os.path.join(output_dir, input_file_small_pl), + os.path.join(expected_dir, input_file_small_pl)) + os.remove(os.path.join(output_dir, input_file_small_pl)) + + +def test_base_process_file_de(mocker, worker, input_dir, input_file1_de, + output_dir, expected_dir): worker.process( os.path.join(input_dir, input_file1_de), {"lang": "de"}, os.path.join(output_dir, input_file1_de) @@ -131,14 +157,6 @@ def test_base_process_file_de(mocker, worker, input_dir, input_file1_de, def test_base_process_file_small_limit_de(mocker, worker_small, input_dir, input_file_small_de, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() - ) worker_small.process( os.path.join(input_dir, input_file_small_de), {"lang": "de"}, os.path.join(output_dir, input_file_small_de) @@ -148,16 +166,30 @@ def test_base_process_file_small_limit_de(mocker, worker_small, input_dir, input os.remove(os.path.join(output_dir, input_file_small_de)) -def test_base_process_file_es(mocker, worker, input_dir, input_file1_es, +def test_base_process_file_de_parallel(mocker, worker, input_dir, input_file1_de, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() + worker.process( + os.path.join(input_dir, input_file1_de), + {"lang": "de", "parallel_subtasks": 8}, os.path.join(output_dir, input_file1_de) ) + assert cmp(os.path.join(output_dir, input_file1_de), + os.path.join(expected_dir, input_file1_de)) + os.remove(os.path.join(output_dir, input_file1_de)) + + +def test_base_process_file_small_limit_de_parallel(mocker, worker_small, input_dir, input_file_small_de, + output_dir, expected_dir): + worker_small.process( + os.path.join(input_dir, input_file_small_de), + {"lang": "de", "parallel_subtasks": 8}, os.path.join(output_dir, input_file_small_de) + ) + assert cmp(os.path.join(output_dir, input_file_small_de), + os.path.join(expected_dir, input_file_small_de)) + os.remove(os.path.join(output_dir, input_file_small_de)) + + +def test_base_process_file_es(mocker, worker, input_dir, input_file1_es, + output_dir, expected_dir): worker.process( os.path.join(input_dir, input_file1_es), {"lang": "es"}, os.path.join(output_dir, input_file1_es) @@ -169,14 +201,6 @@ def test_base_process_file_es(mocker, worker, input_dir, input_file1_es, def test_base_process_file_small_limit_es(mocker, worker_small, input_dir, input_file_small_es, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() - ) worker_small.process( os.path.join(input_dir, input_file_small_es), {"lang": "es"}, os.path.join(output_dir, input_file_small_es) @@ -186,16 +210,30 @@ def test_base_process_file_small_limit_es(mocker, worker_small, input_dir, input os.remove(os.path.join(output_dir, input_file_small_es)) -def test_base_process_file_pt(mocker, worker, input_dir, input_file1_pt, +def test_base_process_file_es_parallel(mocker, worker, input_dir, input_file1_es, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() + worker.process( + os.path.join(input_dir, input_file1_es), + {"lang": "es", "parallel_subtasks": 2}, os.path.join(output_dir, input_file1_es) + ) + assert cmp(os.path.join(output_dir, input_file1_es), + os.path.join(expected_dir, input_file1_es)) + os.remove(os.path.join(output_dir, input_file1_es)) + + +def test_base_process_file_small_limit_es_parallel(mocker, worker_small, input_dir, input_file_small_es, + output_dir, expected_dir): + worker_small.process( + os.path.join(input_dir, input_file_small_es), + {"lang": "es", "parallel_subtasks": 2}, os.path.join(output_dir, input_file_small_es) ) + assert cmp(os.path.join(output_dir, input_file_small_es), + os.path.join(expected_dir, input_file_small_es)) + os.remove(os.path.join(output_dir, input_file_small_es)) + + +def test_base_process_file_pt(mocker, worker, input_dir, input_file1_pt, + output_dir, expected_dir): worker.process( os.path.join(input_dir, input_file1_pt), {"lang": "pt"}, os.path.join(output_dir, input_file1_pt) @@ -207,14 +245,6 @@ def test_base_process_file_pt(mocker, worker, input_dir, input_file1_pt, def test_base_process_file_small_limit_pt(mocker, worker_small, input_dir, input_file_small_pt, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() - ) worker_small.process( os.path.join(input_dir, input_file_small_pt), {"lang": "pt"}, os.path.join(output_dir, input_file_small_pt) @@ -224,16 +254,30 @@ def test_base_process_file_small_limit_pt(mocker, worker_small, input_dir, input os.remove(os.path.join(output_dir, input_file_small_pt)) -def test_base_process_file_fr(mocker, worker, input_dir, input_file1_fr, +def test_base_process_file_pt_parallel(mocker, worker, input_dir, input_file1_pt, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() + worker.process( + os.path.join(input_dir, input_file1_pt), + {"lang": "pt", "parallel_subtasks": 4}, os.path.join(output_dir, input_file1_pt) + ) + assert cmp(os.path.join(output_dir, input_file1_pt), + os.path.join(expected_dir, input_file1_pt)) + os.remove(os.path.join(output_dir, input_file1_pt)) + + +def test_base_process_file_small_limit_pt_parallel(mocker, worker_small, input_dir, input_file_small_pt, + output_dir, expected_dir): + worker_small.process( + os.path.join(input_dir, input_file_small_pt), + {"lang": "pt", "parallel_subtasks": 4}, os.path.join(output_dir, input_file_small_pt) ) + assert cmp(os.path.join(output_dir, input_file_small_pt), + os.path.join(expected_dir, input_file_small_pt)) + os.remove(os.path.join(output_dir, input_file_small_pt)) + + +def test_base_process_file_fr(mocker, worker, input_dir, input_file1_fr, + output_dir, expected_dir): worker.process( os.path.join(input_dir, input_file1_fr), {"lang": "fr"}, os.path.join(output_dir, input_file1_fr) @@ -245,14 +289,6 @@ def test_base_process_file_fr(mocker, worker, input_dir, input_file1_fr, def test_base_process_file_small_limit_fr(mocker, worker_small, input_dir, input_file_small_fr, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() - ) worker_small.process( os.path.join(input_dir, input_file_small_fr), {"lang": "fr"}, os.path.join(output_dir, input_file_small_fr) @@ -262,16 +298,30 @@ def test_base_process_file_small_limit_fr(mocker, worker_small, input_dir, input os.remove(os.path.join(output_dir, input_file_small_fr)) -def test_base_process_file_ru(mocker, worker, input_dir, input_file1_ru, +def test_base_process_file_fr_parallel(mocker, worker, input_dir, input_file1_fr, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() + worker.process( + os.path.join(input_dir, input_file1_fr), + {"lang": "fr", "parallel_subtasks": 4}, os.path.join(output_dir, input_file1_fr) ) + assert cmp(os.path.join(output_dir, input_file1_fr), + os.path.join(expected_dir, input_file1_fr)) + os.remove(os.path.join(output_dir, input_file1_fr)) + + +def test_base_process_file_small_limit_fr_parallel(mocker, worker_small, input_dir, input_file_small_fr, + output_dir, expected_dir): + worker_small.process( + os.path.join(input_dir, input_file_small_fr), + {"lang": "fr", "parallel_subtasks": 4}, os.path.join(output_dir, input_file_small_fr) + ) + assert cmp(os.path.join(output_dir, input_file_small_fr), + os.path.join(expected_dir, input_file_small_fr)) + os.remove(os.path.join(output_dir, input_file_small_fr)) + + +def test_base_process_file_ru(mocker, worker, input_dir, input_file1_ru, + output_dir, expected_dir): worker.process( os.path.join(input_dir, input_file1_ru), {"lang": "ru"}, os.path.join(output_dir, input_file1_ru) @@ -283,14 +333,6 @@ def test_base_process_file_ru(mocker, worker, input_dir, input_file1_ru, def test_base_process_file_small_limit_ru(mocker, worker_small, input_dir, input_file_small_ru, output_dir, expected_dir): - mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None) - mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path) - mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask) - mocker.patch('nlp_ws._worker.NLPWorker.update_progress') - SubTask.prepare_subtask( - {"q_in": ap.AioQueue(), "q_out": ap.AioQueue()}, - os.getpid() - ) worker_small.process( os.path.join(input_dir, input_file_small_ru), {"lang": "ru"}, os.path.join(output_dir, input_file_small_ru) @@ -298,3 +340,25 @@ def test_base_process_file_small_limit_ru(mocker, worker_small, input_dir, input assert cmp(os.path.join(output_dir, input_file_small_ru), os.path.join(expected_dir, input_file_small_ru)) os.remove(os.path.join(output_dir, input_file_small_ru)) + + +def test_base_process_file_ru_parallel(mocker, worker, input_dir, input_file1_ru, + output_dir, expected_dir): + worker.process( + os.path.join(input_dir, input_file1_ru), + {"lang": "ru", "parallel_subtasks": 6}, os.path.join(output_dir, input_file1_ru) + ) + assert cmp(os.path.join(output_dir, input_file1_ru), + os.path.join(expected_dir, input_file1_ru)) + os.remove(os.path.join(output_dir, input_file1_ru)) + + +def test_base_process_file_small_limit_ru_parallel(mocker, worker_small, input_dir, input_file_small_ru, + output_dir, expected_dir): + worker_small.process( + os.path.join(input_dir, input_file_small_ru), + {"lang": "ru", "parallel_subtasks": 6}, os.path.join(output_dir, input_file_small_ru) + ) + assert cmp(os.path.join(output_dir, input_file_small_ru), + os.path.join(expected_dir, input_file_small_ru)) + os.remove(os.path.join(output_dir, input_file_small_ru)) diff --git a/tests/testdata/expected/post_postagger_input_lemmas b/tests/testdata/expected/post_postagger_input_lemmas index deac1e9775047d2c17d106dda1423711fad86ea6..c6ee9fcbb0e274f384c60c113b1cd30e612d4468 100644 --- a/tests/testdata/expected/post_postagger_input_lemmas +++ b/tests/testdata/expected/post_postagger_input_lemmas @@ -11,3 +11,4 @@ woda występować w przyroda być roztwór sól i gaz . najwięcej sól mineralny zawierać woda morski i woda mineralny ; najmniej woda z opad atmosferyczny . woda o mały zawartość składnik mineralny nazywać woda miękki , natomiast zawierać znaczny ilość sól wapń i magnez – woda twardy . oprócz to woda naturalny zawierać rozpuścić substancja pochodzenie organiczny , na przykład . mocznik , kwas humusowy i tym podobne . +