Skip to content
Snippets Groups Projects
worker.py 2.95 KiB
Newer Older
from hydra import compose, initialize
Michał Pogoda's avatar
Michał Pogoda committed
from hydra.utils import instantiate
class Worker:
Michał Pogoda's avatar
Michał Pogoda committed
    """Worker class compatible with the nlp_worker interface.

    This class is responsible for loading the pipeline and running it on the
    given text.

    It's supposed to be used in the nlp_worker but it can be used as a standalone
    for easier debugging.
    """

    def __init__(
        self, configuration="ccl", default_language="pl", default_replacer="tag"
    ) -> None:
Michał Pogoda's avatar
Michał Pogoda committed
        """Initializes the worker.

        Args:
            configuration (str, optional): Hydra configuration of the pipeline.
                Defaults to "ccl".
            default_language (str, optional): Default language of the text.
                Defaults to "pl".
            default_replacer (str, optional): Default method of replacing
                tokens. Defaults to "tag".

        """
Michał Pogoda's avatar
Michał Pogoda committed
        self._last_config = None
        self._pipeline = None

        self._configuration = configuration
        self._default_language = default_language
        self._default_replacer = default_replacer

Michał Pogoda's avatar
Michał Pogoda committed
        super().__init__()
Michał Pogoda's avatar
Michał Pogoda committed
    def _prepare_pipeline(self, task_options):
        language = task_options.get("language", self._default_language)
        replace_method = task_options.get("method", self._default_replacer)
Michał Pogoda's avatar
Michał Pogoda committed
        overrides = [
            "language=" + language,
            "replacers=" + replace_method,
            "configuration=" + self._configuration,
        assert language in ["pl"]
        assert replace_method in ["delete", "tag", "pseudo"]

Michał Pogoda's avatar
Michał Pogoda committed
        config_hash = hash(tuple(overrides))
        if self._last_config != config_hash:
            with initialize(config_path="../config", version_base="1.1"):
Michał Pogoda's avatar
Michał Pogoda committed
                cfg = compose(config_name="config", overrides=overrides)
                self._pipeline = instantiate(cfg["pipeline"])
Michał Pogoda's avatar
Michał Pogoda committed
        return self._pipeline
Michał Pogoda's avatar
Michał Pogoda committed
    def process(self, input_file, task_options, output_file) -> None:
        """Anonymizes input text.

        It is assumed input_file is encoded in UTF-8.

Michał Pogoda's avatar
Michał Pogoda committed
        Args:
            input_file (str): path to the input file
            task_options (Dict[str, str]): task options. Can contain following
                keys:
                method - 'delete'/'tag'/'pseudo' - 'delete' deletes selected
                        tokens, 'tag' replaces selected tokens with arbitrary
                        tags, 'pseudo' replaces selected tokens with a random
                        token that has the same POS tag.
                language - 'pl' - language of the input text. As of now only
                    Polish is supported.
            output_file (str): path to the output file

Michał Pogoda's avatar
Michał Pogoda committed
        pipeline = self._prepare_pipeline(task_options)
Michał Pogoda's avatar
Michał Pogoda committed

        with open(output_file, "w", encoding="utf-8") as f:
Michał Pogoda's avatar
Michał Pogoda committed
            result = pipeline.run(input_file)
            f.write(result)