Select Git revision
main.py 1.78 KiB
"""Implementation of anonymizer service."""
import logging
import nlp_ws
from typing import Dict
from src.worker import Worker
_log = logging.getLogger(__name__)
class AnonymizerWorker(nlp_ws.NLPWorker):
"""NLP WS worker for anonymizer."""
@classmethod
def static_init(cls, config):
"""Initialize the class with configuration.
Args:
config (dict): Configuration dictionary. (It's not strictly dict,
but it supports dict-like operations.)
"""
cls._configuration = config.get("tool").get("configuration", "ccl")
cls._default_language = config.get("tool").get("default_language", "pl")
cls._default_replacer = config.get("tool").get("default_replacer", "tag")
_log.info(
"AnonymizerWorker initialized with configuration:"
"%s, default language: %s, default replacer: %s",
cls._configuration,
cls._default_language,
cls._default_replacer,
)
def __init__(self):
"""Initialize the worker instance."""
self._worker = Worker(
configuration=self._configuration,
default_language=self._default_language,
default_replacer=self._default_replacer,
)
def process(
self, input_file: str, task_options: Dict[str, str], output_file: str
) -> None:
"""Process the input and save the result to the output path.
Args:
input_file (str): Path to the input.
task_options (Dict[str, str]): Runtime configuration of the task.
output_file (str): Path to the output.
"""
self._worker.process(input_file, task_options, output_file)
if __name__ == "__main__":
nlp_ws.NLPService.main(AnonymizerWorker, pause_at_exit=False)