"""Sequential pipeline that runs anonymization process on jsonl-splitted input.""" import json from typing import Dict from src.detectors.interface import Detector from src.input_parsers.interface import InputParser from src.pipeline.interface import Pipeline from src.replacers.interface import ReplacerInterface from src.suppressors.interface import Suppressor import clarin_json class SequentialJSONLPipeline(Pipeline): """Pipeline that runs the whole anonymization process on jsonl-splitted input. This pipeline supports cases where the input is splitted into multiple parts and each part is processed separately and then concatenated into single text output. """ def __init__( self, input_parser: InputParser, detectors: Dict[str, Detector], suppressor: Suppressor, replacers: Dict[str, ReplacerInterface], concat_to_txt: bool = False, ): """Initialize pipeline. Args: input_parser (InputParser): Object that parses input into text and annotations. detectors (Dict[str, Detector]): List of detectors. suppressor (Suppressor): List of suppressors. replacers (Dict[str, ReplacerInterface]): List of replacers. concat_to_txt (bool, optional): If true, concatenates output to single txt file. If false - returns output in jsonl format, splitted in the same way as the input. Defaults to False. """ # TODO: Maybe input parser should be set by default to JSONL parser? self._input_parser = input_parser self._detectors = detectors self._suppressor = suppressor self._replacers = replacers self._concat_to_txt = concat_to_txt def run(self, input_path: str) -> str: """Run the whole anonymization pipeline. Args: input_path (str): Path to the input supported by input parser. Returns: str: Anonymized text. """ result = [] with clarin_json.open(input_path, 'r') as f: for line in f: parsed_input = self._input_parser.parse(line) detected_entities = [] for detector_name, detector in self._detectors.items(): detected_entities += detector.detect( parsed_input[0], parsed_input[1] ) annotaitons_cleaned = self._suppressor.suppress(detected_entities) replaced_input = parsed_input[0] annotations_left = annotaitons_cleaned for replacer_name, replacer in self._replacers.items(): replaced_input, annotations_left = replacer.replace( replaced_input, annotations_left ) result.append({"text": replaced_input}) if self._concat_to_txt: result_text = "" for item in result: text = item["text"] if ( result_text != "" and result_text.rstrip() == result_text and text.lstrip() == text ): result_text += " " + text else: result_text += text return result_text else: return "\n".join([json.dumps(item, ensure_ascii=False) for item in result])