Skip to content
Snippets Groups Projects
main.py 1.78 KiB
Newer Older
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed
"""Implementation of anonymizer service."""
import logging
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed
import nlp_ws
Michał Pogoda's avatar
Michał Pogoda committed
from typing import Dict
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed

from src.worker import Worker

Bartosz Walkowiak's avatar
Bartosz Walkowiak committed
_log = logging.getLogger(__name__)


class AnonymizerWorker(nlp_ws.NLPWorker):
Michał Pogoda's avatar
Michał Pogoda committed
    """NLP WS worker for anonymizer."""
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed

    @classmethod
    def static_init(cls, config):
Michał Pogoda's avatar
Michał Pogoda committed
        """Initialize the class with configuration.

        Args:
            config (dict): Configuration dictionary. (It's not strictly dict,
                but it supports dict-like operations.)

        """
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed
        cls._configuration = config.get("tool").get("configuration", "ccl")
        cls._default_language = config.get("tool").get("default_language", "pl")
        cls._default_replacer = config.get("tool").get("default_replacer", "tag")

        _log.info(
Michał Pogoda's avatar
Michał Pogoda committed
            "AnonymizerWorker initialized with configuration:"
            "%s, default language: %s, default replacer: %s",
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed
            cls._configuration,
            cls._default_language,
            cls._default_replacer,
        )

    def __init__(self):
Michał Pogoda's avatar
Michał Pogoda committed
        """Initialize the worker instance."""
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed
        self._worker = Worker(
            configuration=self._configuration,
            default_language=self._default_language,
            default_replacer=self._default_replacer,
        )

Michał Pogoda's avatar
Michał Pogoda committed
    def process(
        self, input_file: str, task_options: Dict[str, str], output_file: str
    ) -> None:
        """Process the input and save the result to the output path.

        Args:
            input_file (str): Path to the input.
            task_options (Dict[str, str]): Runtime configuration of the task.
            output_file (str): Path to the output.

        """
Bartosz Walkowiak's avatar
Bartosz Walkowiak committed
        self._worker.process(input_file, task_options, output_file)


if __name__ == "__main__":
    nlp_ws.NLPService.main(AnonymizerWorker, pause_at_exit=False)