Skip to content
Snippets Groups Projects
Commit c23d5e0f authored by Paweł Walkowiak's avatar Paweł Walkowiak
Browse files

Merge branch 'parallelize_subtasks' into 'master'

Parallelize subtasks

See merge request !6
parents 9aa4b10f 21f025f6
Branches
1 merge request!6Parallelize subtasks
Pipeline #10350 passed with stages
in 1 minute and 33 seconds
......@@ -13,6 +13,7 @@ lpmn = lpmn
[tool]
config = pos_tagger.yaml
workers_number = 20
parallel_subtasks = 10
chunking_limit = 50000
[logging]
......
......@@ -18,6 +18,7 @@ SubTask.turn_on()
DEFAULT_TYPE = "default"
OUTPUT = "output"
JSON = "json"
LEMMAS = "lemmas"
TAGSET = "tagset"
TAGGER = "tagger"
NER = "ner"
......@@ -39,7 +40,10 @@ class TaggerWorker(nlp_ws.NLPWorker):
_log.info(f"Config taggers from yaml: {cls._taggers}")
_log.info(f"Config ners from yaml: {cls._ners}")
cls._parallel_subtasks = config.get('tool').get('parallel_subtasks', 10)
cls._chunking_limit = config.get('tool').get('chunking_limit', 50000)
if not isinstance(cls._parallel_subtasks, int):
cls._parallel_subtasks = int(cls._parallel_subtasks)
if not isinstance(cls._chunking_limit, int):
cls._chunking_limit = int(cls._chunking_limit)
_log.info(f"Chunk size: {cls._chunking_limit}")
......@@ -75,6 +79,7 @@ class TaggerWorker(nlp_ws.NLPWorker):
json_text: bool if json output should contain original
text (default = True)
method: method of processing (default = 'tagger', values: tagger, ner)
parallel_subtasks: number of parallel subtasks (default = 10)
:type task_options: dict
:param output_path: Path to directory where the
......@@ -102,6 +107,11 @@ class TaggerWorker(nlp_ws.NLPWorker):
json_text = task_options.get("json_text", True)
parallel_subtasks = task_options.get(
"parallel_subtasks",
self._parallel_subtasks
)
tagger_opt = self._taggers[lang][DEFAULT_TYPE]
ner_opt = self._ners[lang][DEFAULT_TYPE]
convert_lpmn = self.get_converter_directive(
......@@ -122,7 +132,7 @@ class TaggerWorker(nlp_ws.NLPWorker):
_dir_style = True
json_lpmn = [json_lpmn]
_log.debug(f"Running LPMN: {json_lpmn}")
if output == JSON and not _dir_style:
if output in [JSON, LEMMAS] and not _dir_style:
# split file into chunks
chunk_size = int(self._chunking_limit * 0.5)
destination_path = os.path.join(
......@@ -149,7 +159,8 @@ class TaggerWorker(nlp_ws.NLPWorker):
destination_path,
splitted_corpus,
json_lpmn,
_log
_log,
parallel_subtasks
)
# remove tmp directory
if os.path.exists(destination_path):
......
......@@ -39,7 +39,7 @@ def _update_last_chunk(tail_data: str, chunk_file_name: str,
def merge_splits(output_path: str, destination_path: str,
splitted_corpus: List[str], json_lpmn: str,
_log: logging.Logger):
_log: logging.Logger, parallel_subtasks: int = 1):
"""Merges splitted corpus into one file.
:param output_path: path to output file
......@@ -52,28 +52,47 @@ def merge_splits(output_path: str, destination_path: str,
:type json_lpmn: str
:param _log: logger
:type _log: logging.Logger
:param parallel_subtasks: number of parallel subtasks (default: 1)
:type parallel_subtasks: int
"""
# remove output file if exists
if os.path.isfile(output_path):
os.remove(output_path)
# create output file
logging.debug(f"Creating output file: {output_path}")
with open(output_path, "a") as f2:
logging.debug(f"Created output file: {output_path}")
# run tagger on each chunk
subtask_args_queue_awaiting = []
for dbg_i, chunk in enumerate(splitted_corpus):
_log.info(
f"Running chunk {dbg_i}: {chunk}"
f"Spawning task {dbg_i}: {chunk}"
)
subtask = SubTask(
os.path.join(destination_path, chunk),
json_lpmn
subtask_args_queue_awaiting.append(
(
os.path.join(destination_path, chunk),
json_lpmn
)
)
subtask.run(blocking=False)
l_result = subtask.get_output_path()
_log.debug(f"Result of chunk: {l_result}")
# merge results
with open(l_result, "r") as f:
f2.write(f"{f.read()}\n")
logging.debug(f"Subtask args queue: {subtask_args_queue_awaiting}")
while len(subtask_args_queue_awaiting) > 0:
args = subtask_args_queue_awaiting[:parallel_subtasks]
logging.debug(f"Subtask args: {args}")
subtasks = [
SubTask(arg_tuple[0], arg_tuple[1]) for arg_tuple in args
]
for subtask in subtasks:
subtask.run(blocking=False)
l_results = [subtask.get_output_path() for subtask in subtasks]
logging.debug(f"Multiple results: {l_results}")
for l_result in l_results:
_log.debug(f"Result of chunk: {l_result}")
with open(l_result, "r") as f:
f2.write(f"{f.read()}\n")
del subtask_args_queue_awaiting[:parallel_subtasks]
def split_corpus(source_path: str, destination_path: str, file_name: str,
......
......@@ -3,6 +3,7 @@ import shutil
from filecmp import cmp
import aioprocessing as ap
import queue
from nlp_ws import SubTask
from src.tagger import TaggerWorker
......@@ -29,13 +30,22 @@ def get_output_path(self, timeout=0):
return tmp_subtask_result_file[dict_key]
def subtaskbase_init_gen(pid):
def subtaskbase_init(self):
self.process_id = pid
self.q_in = queue.Queue()
self.q_out = self._processes[self.process_id]["q_out"]
self.idx = None
return subtaskbase_init
def test_init():
worker = TaggerWorker()
assert type(worker).__name__ == 'TaggerWorker'
def test_base_process_file_en(mocker, worker, input_dir, input_file1,
output_dir, expected_dir):
@pytest.fixture(autouse=True)
def run_around_tests(mocker):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
......@@ -44,6 +54,10 @@ def test_base_process_file_en(mocker, worker, input_dir, input_file1,
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
def test_base_process_file_en(mocker, worker, input_dir, input_file1,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1),
{"lang": "en"}, os.path.join(output_dir, input_file1)
......@@ -55,14 +69,6 @@ def test_base_process_file_en(mocker, worker, input_dir, input_file1,
def test_base_process_file_small_limit_en(mocker, worker_small, input_dir, input_file_small,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
worker_small.process(
os.path.join(input_dir, input_file_small),
{"lang": "en"}, os.path.join(output_dir, input_file_small)
......@@ -72,16 +78,30 @@ def test_base_process_file_small_limit_en(mocker, worker_small, input_dir, input
os.remove(os.path.join(output_dir, input_file_small))
def test_base_process_file_pl(mocker, worker, input_dir, input_file1_pl,
def test_base_process_file_en_parallel(mocker, worker, input_dir, input_file1,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
worker.process(
os.path.join(input_dir, input_file1),
{"lang": "en", "parallel_subtasks": 4}, os.path.join(output_dir, input_file1)
)
assert cmp(os.path.join(output_dir, input_file1),
os.path.join(expected_dir, input_file1))
os.remove(os.path.join(output_dir, input_file1))
def test_base_process_file_small_limit_en_parallel(mocker, worker_small, input_dir, input_file_small,
output_dir, expected_dir):
worker_small.process(
os.path.join(input_dir, input_file_small),
{"lang": "en", "parallel_subtasks": 4}, os.path.join(output_dir, input_file_small)
)
assert cmp(os.path.join(output_dir, input_file_small),
os.path.join(expected_dir, input_file_small))
os.remove(os.path.join(output_dir, input_file_small))
def test_base_process_file_pl(mocker, worker, input_dir, input_file1_pl,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1_pl),
{"lang": "pl"}, os.path.join(output_dir, input_file1_pl)
......@@ -93,14 +113,6 @@ def test_base_process_file_pl(mocker, worker, input_dir, input_file1_pl,
def test_base_process_file_small_limit_pl(mocker, worker_small, input_dir, input_file_small_pl,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
worker_small.process(
os.path.join(input_dir, input_file_small_pl),
{"lang": "pl"}, os.path.join(output_dir, input_file_small_pl)
......@@ -110,16 +122,30 @@ def test_base_process_file_small_limit_pl(mocker, worker_small, input_dir, input
os.remove(os.path.join(output_dir, input_file_small_pl))
def test_base_process_file_de(mocker, worker, input_dir, input_file1_de,
def test_base_process_file_pl_parallel(mocker, worker, input_dir, input_file1_pl,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
worker.process(
os.path.join(input_dir, input_file1_pl),
{"lang": "pl", "parallel_subtasks": 3}, os.path.join(output_dir, input_file1_pl)
)
assert cmp(os.path.join(output_dir, input_file1_pl),
os.path.join(expected_dir, input_file1_pl))
os.remove(os.path.join(output_dir, input_file1_pl))
def test_base_process_file_small_limit_pl_parallel(mocker, worker_small, input_dir, input_file_small_pl,
output_dir, expected_dir):
worker_small.process(
os.path.join(input_dir, input_file_small_pl),
{"lang": "pl", "parallel_subtasks": 3}, os.path.join(output_dir, input_file_small_pl)
)
assert cmp(os.path.join(output_dir, input_file_small_pl),
os.path.join(expected_dir, input_file_small_pl))
os.remove(os.path.join(output_dir, input_file_small_pl))
def test_base_process_file_de(mocker, worker, input_dir, input_file1_de,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1_de),
{"lang": "de"}, os.path.join(output_dir, input_file1_de)
......@@ -131,14 +157,6 @@ def test_base_process_file_de(mocker, worker, input_dir, input_file1_de,
def test_base_process_file_small_limit_de(mocker, worker_small, input_dir, input_file_small_de,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
worker_small.process(
os.path.join(input_dir, input_file_small_de),
{"lang": "de"}, os.path.join(output_dir, input_file_small_de)
......@@ -148,16 +166,30 @@ def test_base_process_file_small_limit_de(mocker, worker_small, input_dir, input
os.remove(os.path.join(output_dir, input_file_small_de))
def test_base_process_file_es(mocker, worker, input_dir, input_file1_es,
def test_base_process_file_de_parallel(mocker, worker, input_dir, input_file1_de,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
worker.process(
os.path.join(input_dir, input_file1_de),
{"lang": "de", "parallel_subtasks": 8}, os.path.join(output_dir, input_file1_de)
)
assert cmp(os.path.join(output_dir, input_file1_de),
os.path.join(expected_dir, input_file1_de))
os.remove(os.path.join(output_dir, input_file1_de))
def test_base_process_file_small_limit_de_parallel(mocker, worker_small, input_dir, input_file_small_de,
output_dir, expected_dir):
worker_small.process(
os.path.join(input_dir, input_file_small_de),
{"lang": "de", "parallel_subtasks": 8}, os.path.join(output_dir, input_file_small_de)
)
assert cmp(os.path.join(output_dir, input_file_small_de),
os.path.join(expected_dir, input_file_small_de))
os.remove(os.path.join(output_dir, input_file_small_de))
def test_base_process_file_es(mocker, worker, input_dir, input_file1_es,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1_es),
{"lang": "es"}, os.path.join(output_dir, input_file1_es)
......@@ -169,14 +201,6 @@ def test_base_process_file_es(mocker, worker, input_dir, input_file1_es,
def test_base_process_file_small_limit_es(mocker, worker_small, input_dir, input_file_small_es,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
worker_small.process(
os.path.join(input_dir, input_file_small_es),
{"lang": "es"}, os.path.join(output_dir, input_file_small_es)
......@@ -186,16 +210,30 @@ def test_base_process_file_small_limit_es(mocker, worker_small, input_dir, input
os.remove(os.path.join(output_dir, input_file_small_es))
def test_base_process_file_pt(mocker, worker, input_dir, input_file1_pt,
def test_base_process_file_es_parallel(mocker, worker, input_dir, input_file1_es,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
worker.process(
os.path.join(input_dir, input_file1_es),
{"lang": "es", "parallel_subtasks": 2}, os.path.join(output_dir, input_file1_es)
)
assert cmp(os.path.join(output_dir, input_file1_es),
os.path.join(expected_dir, input_file1_es))
os.remove(os.path.join(output_dir, input_file1_es))
def test_base_process_file_small_limit_es_parallel(mocker, worker_small, input_dir, input_file_small_es,
output_dir, expected_dir):
worker_small.process(
os.path.join(input_dir, input_file_small_es),
{"lang": "es", "parallel_subtasks": 2}, os.path.join(output_dir, input_file_small_es)
)
assert cmp(os.path.join(output_dir, input_file_small_es),
os.path.join(expected_dir, input_file_small_es))
os.remove(os.path.join(output_dir, input_file_small_es))
def test_base_process_file_pt(mocker, worker, input_dir, input_file1_pt,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1_pt),
{"lang": "pt"}, os.path.join(output_dir, input_file1_pt)
......@@ -207,14 +245,6 @@ def test_base_process_file_pt(mocker, worker, input_dir, input_file1_pt,
def test_base_process_file_small_limit_pt(mocker, worker_small, input_dir, input_file_small_pt,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
worker_small.process(
os.path.join(input_dir, input_file_small_pt),
{"lang": "pt"}, os.path.join(output_dir, input_file_small_pt)
......@@ -224,16 +254,30 @@ def test_base_process_file_small_limit_pt(mocker, worker_small, input_dir, input
os.remove(os.path.join(output_dir, input_file_small_pt))
def test_base_process_file_fr(mocker, worker, input_dir, input_file1_fr,
def test_base_process_file_pt_parallel(mocker, worker, input_dir, input_file1_pt,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
worker.process(
os.path.join(input_dir, input_file1_pt),
{"lang": "pt", "parallel_subtasks": 4}, os.path.join(output_dir, input_file1_pt)
)
assert cmp(os.path.join(output_dir, input_file1_pt),
os.path.join(expected_dir, input_file1_pt))
os.remove(os.path.join(output_dir, input_file1_pt))
def test_base_process_file_small_limit_pt_parallel(mocker, worker_small, input_dir, input_file_small_pt,
output_dir, expected_dir):
worker_small.process(
os.path.join(input_dir, input_file_small_pt),
{"lang": "pt", "parallel_subtasks": 4}, os.path.join(output_dir, input_file_small_pt)
)
assert cmp(os.path.join(output_dir, input_file_small_pt),
os.path.join(expected_dir, input_file_small_pt))
os.remove(os.path.join(output_dir, input_file_small_pt))
def test_base_process_file_fr(mocker, worker, input_dir, input_file1_fr,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1_fr),
{"lang": "fr"}, os.path.join(output_dir, input_file1_fr)
......@@ -245,14 +289,6 @@ def test_base_process_file_fr(mocker, worker, input_dir, input_file1_fr,
def test_base_process_file_small_limit_fr(mocker, worker_small, input_dir, input_file_small_fr,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
worker_small.process(
os.path.join(input_dir, input_file_small_fr),
{"lang": "fr"}, os.path.join(output_dir, input_file_small_fr)
......@@ -262,16 +298,30 @@ def test_base_process_file_small_limit_fr(mocker, worker_small, input_dir, input
os.remove(os.path.join(output_dir, input_file_small_fr))
def test_base_process_file_ru(mocker, worker, input_dir, input_file1_ru,
def test_base_process_file_fr_parallel(mocker, worker, input_dir, input_file1_fr,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
worker.process(
os.path.join(input_dir, input_file1_fr),
{"lang": "fr", "parallel_subtasks": 4}, os.path.join(output_dir, input_file1_fr)
)
assert cmp(os.path.join(output_dir, input_file1_fr),
os.path.join(expected_dir, input_file1_fr))
os.remove(os.path.join(output_dir, input_file1_fr))
def test_base_process_file_small_limit_fr_parallel(mocker, worker_small, input_dir, input_file_small_fr,
output_dir, expected_dir):
worker_small.process(
os.path.join(input_dir, input_file_small_fr),
{"lang": "fr", "parallel_subtasks": 4}, os.path.join(output_dir, input_file_small_fr)
)
assert cmp(os.path.join(output_dir, input_file_small_fr),
os.path.join(expected_dir, input_file_small_fr))
os.remove(os.path.join(output_dir, input_file_small_fr))
def test_base_process_file_ru(mocker, worker, input_dir, input_file1_ru,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1_ru),
{"lang": "ru"}, os.path.join(output_dir, input_file1_ru)
......@@ -283,14 +333,6 @@ def test_base_process_file_ru(mocker, worker, input_dir, input_file1_ru,
def test_base_process_file_small_limit_ru(mocker, worker_small, input_dir, input_file_small_ru,
output_dir, expected_dir):
mocker.patch('nlp_ws._subtask.SubTask.run', return_value=None)
mocker.patch('nlp_ws._subtask.SubTask.get_output_path', get_output_path)
mocker.patch('nlp_ws._subtask.SubTask.prepare_subtask', prepare_subtask)
mocker.patch('nlp_ws._worker.NLPWorker.update_progress')
SubTask.prepare_subtask(
{"q_in": ap.AioQueue(), "q_out": ap.AioQueue()},
os.getpid()
)
worker_small.process(
os.path.join(input_dir, input_file_small_ru),
{"lang": "ru"}, os.path.join(output_dir, input_file_small_ru)
......@@ -298,3 +340,25 @@ def test_base_process_file_small_limit_ru(mocker, worker_small, input_dir, input
assert cmp(os.path.join(output_dir, input_file_small_ru),
os.path.join(expected_dir, input_file_small_ru))
os.remove(os.path.join(output_dir, input_file_small_ru))
def test_base_process_file_ru_parallel(mocker, worker, input_dir, input_file1_ru,
output_dir, expected_dir):
worker.process(
os.path.join(input_dir, input_file1_ru),
{"lang": "ru", "parallel_subtasks": 6}, os.path.join(output_dir, input_file1_ru)
)
assert cmp(os.path.join(output_dir, input_file1_ru),
os.path.join(expected_dir, input_file1_ru))
os.remove(os.path.join(output_dir, input_file1_ru))
def test_base_process_file_small_limit_ru_parallel(mocker, worker_small, input_dir, input_file_small_ru,
output_dir, expected_dir):
worker_small.process(
os.path.join(input_dir, input_file_small_ru),
{"lang": "ru", "parallel_subtasks": 6}, os.path.join(output_dir, input_file_small_ru)
)
assert cmp(os.path.join(output_dir, input_file_small_ru),
os.path.join(expected_dir, input_file_small_ru))
os.remove(os.path.join(output_dir, input_file_small_ru))
......@@ -11,3 +11,4 @@ woda występować w przyroda być roztwór sól i gaz .
najwięcej sól mineralny zawierać woda morski i woda mineralny ; najmniej woda z opad atmosferyczny .
woda o mały zawartość składnik mineralny nazywać woda miękki , natomiast zawierać znaczny ilość sól wapń i magnez – woda twardy .
oprócz to woda naturalny zawierać rozpuścić substancja pochodzenie organiczny , na przykład . mocznik , kwas humusowy i tym podobne .
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment