From 2d8783a5a5dcd247a01c8a24ec40c7ab6dc90fca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pogoda?= <mipo57@e-science.pl> Date: Thu, 19 Jan 2023 08:39:07 +0100 Subject: [PATCH] Working MVP of pipeline --- config/config.yaml | 9 + config/detectors/all.yaml | 7 + config/detectors/date.yaml | 3 + config/detectors/email.yaml | 2 + config/detectors/ner.yaml | 3 + config/detectors/phone.yaml | 2 + config/detectors/url.yaml | 3 + config/detectors/user.yaml | 2 + config/input_parser/ccl.yaml | 1 + config/paths/default.yaml | 1 + config/pipeline/default.yaml | 5 + config/replacers/date.yaml | 2 + config/replacers/delete.yaml | 2 + config/replacers/email.yaml | 2 + config/replacers/ner.yaml | 5 + config/replacers/pseudo.yaml | 6 + config/replacers/tag.yaml | 2 + config/replacers/user.yaml | 2 + config/suppressor/order_based.yaml | 1 + main.py | 2 +- requirements.txt | 5 +- scripts/cli.py | 2 +- src/annotation_mapping.py | 54 +++ src/annotations/__init__.py | 2 + src/annotations/annotation.py | 67 ++++ src/annotations/date.py | 27 ++ src/base_anonymizer.py | 167 ---------- src/ccl_handler.py | 20 -- src/ccl_parser.py | 70 ---- src/detectors/date/__init__.py | 2 +- src/detectors/date/date.py | 31 +- src/detectors/date/en.py | 13 +- src/detectors/date/pl.py | 14 +- src/detectors/date/ru.py | 14 +- src/detectors/date/utils.py | 111 +++++++ src/detectors/email/__init__.py | 2 +- src/detectors/email/email.py | 33 +- src/detectors/interface.py | 9 + src/detectors/ner/__init__.py | 2 +- src/detectors/ner/ner.py | 23 +- src/detectors/ner/pl_liner_n5.py | 18 +- src/detectors/phone/__init__.py | 2 +- src/detectors/phone/phone.py | 28 +- src/detectors/url/__init__.py | 2 +- src/detectors/url/url.py | 28 +- src/detectors/user/__init__.py | 2 +- src/detectors/user/user.py | 27 +- src/dictionaries/morphosyntactic/__init__.py | 1 + src/dictionaries/morphosyntactic/interface.py | 10 + src/dictionaries/morphosyntactic/pl_ner.py | 110 ++++++ src/dictionaries/pl_ner_replacements.py | 46 --- src/entity_types.py | 14 - src/generators.py | 314 ------------------ src/input_parsers/__init__.py | 0 src/input_parsers/ccl.py | 75 +++++ src/input_parsers/interface.py | 17 + src/mappings/__init__.py | 0 src/mappings/ner_pl_n5_mapping.py | 15 + src/pipeline/__init__.py | 0 src/pipeline/default.py | 38 +++ src/pipeline/interface.py | 3 + src/replacers/__init__.py | 2 + src/replacers/date_replacer.py | 88 +++++ src/replacers/delete_replacer.py | 20 ++ src/replacers/email_replacer.py | 38 +++ src/replacers/interface.py | 22 ++ src/replacers/ner_replacer.py | 36 ++ src/replacers/tag_replacer.py | 47 +++ src/replacers/user_replacer.py | 32 ++ src/string_replacements.py | 75 ++++- src/suppressors/interface.py | 15 + src/suppressors/order_based.py | 19 +- src/tag_anonimization.py | 40 --- src/utils/ner_pl_n5_mapping.py | 9 - src/worker.py | 41 ++- tests/detectors/date/test_en.py | 29 +- tests/detectors/date/test_pl.py | 27 +- tests/detectors/date/test_ru.py | 28 +- tests/detectors/email/test_email.py | 10 +- tests/detectors/ner/test_pl_liner_n5.py | 22 +- tests/detectors/phone/test_phone.py | 10 +- tests/detectors/url/test_url.py | 21 +- tests/detectors/user/test_user.py | 10 +- .../dictionaries/morphosyntactic/__init__.py | 0 .../morphosyntactic/test_pl_ner.py | 22 ++ .../dictionaries/test_pl_ner_replacements.py | 38 --- tests/input_parsers/__init__.py | 0 .../test_ccl.py} | 29 +- tests/pipeline/__init__.py | 0 tests/pipeline/test_default.py | 33 ++ tests/replacers/__init__.py | 0 tests/replacers/test_date_replacer.py | 43 +++ tests/replacers/test_email_replacer.py | 43 +++ tests/replacers/test_ner_replacer.py | 32 ++ tests/replacers/test_tag_replacer.py | 21 ++ tests/replacers/test_user_replacer.py | 43 +++ tests/test_annotation_mapping.py | 19 ++ tests/test_string_replacements.py | 18 +- tests/test_tag_anonimization.py | 17 - 99 files changed, 1585 insertions(+), 894 deletions(-) create mode 100644 config/config.yaml create mode 100644 config/detectors/all.yaml create mode 100644 config/detectors/date.yaml create mode 100644 config/detectors/email.yaml create mode 100644 config/detectors/ner.yaml create mode 100644 config/detectors/phone.yaml create mode 100644 config/detectors/url.yaml create mode 100644 config/detectors/user.yaml create mode 100644 config/input_parser/ccl.yaml create mode 100644 config/paths/default.yaml create mode 100644 config/pipeline/default.yaml create mode 100644 config/replacers/date.yaml create mode 100644 config/replacers/delete.yaml create mode 100644 config/replacers/email.yaml create mode 100644 config/replacers/ner.yaml create mode 100644 config/replacers/pseudo.yaml create mode 100644 config/replacers/tag.yaml create mode 100644 config/replacers/user.yaml create mode 100644 config/suppressor/order_based.yaml create mode 100644 src/annotation_mapping.py create mode 100644 src/annotations/__init__.py create mode 100644 src/annotations/annotation.py create mode 100644 src/annotations/date.py delete mode 100644 src/base_anonymizer.py delete mode 100644 src/ccl_handler.py delete mode 100644 src/ccl_parser.py create mode 100644 src/detectors/date/utils.py create mode 100644 src/detectors/interface.py create mode 100644 src/dictionaries/morphosyntactic/__init__.py create mode 100644 src/dictionaries/morphosyntactic/interface.py create mode 100644 src/dictionaries/morphosyntactic/pl_ner.py delete mode 100644 src/dictionaries/pl_ner_replacements.py delete mode 100644 src/entity_types.py delete mode 100644 src/generators.py create mode 100644 src/input_parsers/__init__.py create mode 100644 src/input_parsers/ccl.py create mode 100644 src/input_parsers/interface.py create mode 100644 src/mappings/__init__.py create mode 100644 src/mappings/ner_pl_n5_mapping.py create mode 100644 src/pipeline/__init__.py create mode 100644 src/pipeline/default.py create mode 100644 src/pipeline/interface.py create mode 100644 src/replacers/__init__.py create mode 100644 src/replacers/date_replacer.py create mode 100644 src/replacers/delete_replacer.py create mode 100644 src/replacers/email_replacer.py create mode 100644 src/replacers/interface.py create mode 100644 src/replacers/ner_replacer.py create mode 100644 src/replacers/tag_replacer.py create mode 100644 src/replacers/user_replacer.py create mode 100644 src/suppressors/interface.py delete mode 100644 src/tag_anonimization.py delete mode 100644 src/utils/ner_pl_n5_mapping.py create mode 100644 tests/dictionaries/morphosyntactic/__init__.py create mode 100644 tests/dictionaries/morphosyntactic/test_pl_ner.py delete mode 100644 tests/dictionaries/test_pl_ner_replacements.py create mode 100644 tests/input_parsers/__init__.py rename tests/{test_ccl_parser.py => input_parsers/test_ccl.py} (67%) create mode 100644 tests/pipeline/__init__.py create mode 100644 tests/pipeline/test_default.py create mode 100644 tests/replacers/__init__.py create mode 100644 tests/replacers/test_date_replacer.py create mode 100644 tests/replacers/test_email_replacer.py create mode 100644 tests/replacers/test_ner_replacer.py create mode 100644 tests/replacers/test_tag_replacer.py create mode 100644 tests/replacers/test_user_replacer.py create mode 100644 tests/test_annotation_mapping.py delete mode 100644 tests/test_tag_anonimization.py diff --git a/config/config.yaml b/config/config.yaml new file mode 100644 index 0000000..d4c077e --- /dev/null +++ b/config/config.yaml @@ -0,0 +1,9 @@ +defaults: + - detectors: all + - replacers: tag + - suppressor: order_based + - input_parser: ccl + - pipeline: default + - _self_ + +language: "pl" \ No newline at end of file diff --git a/config/detectors/all.yaml b/config/detectors/all.yaml new file mode 100644 index 0000000..c7be5fb --- /dev/null +++ b/config/detectors/all.yaml @@ -0,0 +1,7 @@ +defaults: + - date + - email + - ner + - phone + - url + - user \ No newline at end of file diff --git a/config/detectors/date.yaml b/config/detectors/date.yaml new file mode 100644 index 0000000..8882b16 --- /dev/null +++ b/config/detectors/date.yaml @@ -0,0 +1,3 @@ +date: + _target_: src.detectors.date.DateDetector + language: $language \ No newline at end of file diff --git a/config/detectors/email.yaml b/config/detectors/email.yaml new file mode 100644 index 0000000..b6ba478 --- /dev/null +++ b/config/detectors/email.yaml @@ -0,0 +1,2 @@ +email: + _target_: src.detectors.email.EmailDetector \ No newline at end of file diff --git a/config/detectors/ner.yaml b/config/detectors/ner.yaml new file mode 100644 index 0000000..b11623b --- /dev/null +++ b/config/detectors/ner.yaml @@ -0,0 +1,3 @@ +ner: + _target_: src.detectors.ner.NerDetector + language: ${language} \ No newline at end of file diff --git a/config/detectors/phone.yaml b/config/detectors/phone.yaml new file mode 100644 index 0000000..7a0ea47 --- /dev/null +++ b/config/detectors/phone.yaml @@ -0,0 +1,2 @@ +phone: + _target_: src.detectors.phone.PhoneNumberDetector \ No newline at end of file diff --git a/config/detectors/url.yaml b/config/detectors/url.yaml new file mode 100644 index 0000000..c2ff600 --- /dev/null +++ b/config/detectors/url.yaml @@ -0,0 +1,3 @@ +url: + _target_: src.detectors.url.UrlDetector + language: $language \ No newline at end of file diff --git a/config/detectors/user.yaml b/config/detectors/user.yaml new file mode 100644 index 0000000..274a991 --- /dev/null +++ b/config/detectors/user.yaml @@ -0,0 +1,2 @@ +user: + _target_: src.detectors.user.UserDetector \ No newline at end of file diff --git a/config/input_parser/ccl.yaml b/config/input_parser/ccl.yaml new file mode 100644 index 0000000..a707d09 --- /dev/null +++ b/config/input_parser/ccl.yaml @@ -0,0 +1 @@ +_target_: src.input_parsers.ccl.CCLInputParser \ No newline at end of file diff --git a/config/paths/default.yaml b/config/paths/default.yaml new file mode 100644 index 0000000..657a167 --- /dev/null +++ b/config/paths/default.yaml @@ -0,0 +1 @@ +dictionaries_path: dictionaries \ No newline at end of file diff --git a/config/pipeline/default.yaml b/config/pipeline/default.yaml new file mode 100644 index 0000000..494b4dd --- /dev/null +++ b/config/pipeline/default.yaml @@ -0,0 +1,5 @@ +_target_: src.pipeline.default.DefaultPipeline +input_parser: ${input_parser} +detectors: ${detectors} +suppressor: ${suppressor} +replacers: ${replacers} \ No newline at end of file diff --git a/config/replacers/date.yaml b/config/replacers/date.yaml new file mode 100644 index 0000000..dd6076c --- /dev/null +++ b/config/replacers/date.yaml @@ -0,0 +1,2 @@ +date: + _target_: src.replacers.date_replacer.DateReplacer \ No newline at end of file diff --git a/config/replacers/delete.yaml b/config/replacers/delete.yaml new file mode 100644 index 0000000..cd043b0 --- /dev/null +++ b/config/replacers/delete.yaml @@ -0,0 +1,2 @@ +delete: + _target_: src.replacers.delete_replacer.DeleteReplacer \ No newline at end of file diff --git a/config/replacers/email.yaml b/config/replacers/email.yaml new file mode 100644 index 0000000..0b93661 --- /dev/null +++ b/config/replacers/email.yaml @@ -0,0 +1,2 @@ +email: + _target_: src.replacers.email_replacer.EmailReplacer \ No newline at end of file diff --git a/config/replacers/ner.yaml b/config/replacers/ner.yaml new file mode 100644 index 0000000..8d20018 --- /dev/null +++ b/config/replacers/ner.yaml @@ -0,0 +1,5 @@ +ner: + _target_: src.replacers.ner_replacer.NERReplacer + dictionary: + _target_: src.dictionaries.morphosyntactic.pl_ner.PlNERMorphosyntacticDictionary + dictionary_path: ${paths.dictionaries_path}/pl_dict.txt \ No newline at end of file diff --git a/config/replacers/pseudo.yaml b/config/replacers/pseudo.yaml new file mode 100644 index 0000000..5c4a301 --- /dev/null +++ b/config/replacers/pseudo.yaml @@ -0,0 +1,6 @@ +defaults: + - date + - email + - ner + - user + - tag # Fallback to tag replacement if no other replacement is found \ No newline at end of file diff --git a/config/replacers/tag.yaml b/config/replacers/tag.yaml new file mode 100644 index 0000000..86f2da9 --- /dev/null +++ b/config/replacers/tag.yaml @@ -0,0 +1,2 @@ +tag: + _target_: src.replacers.tag_replacer.TagReplacer \ No newline at end of file diff --git a/config/replacers/user.yaml b/config/replacers/user.yaml new file mode 100644 index 0000000..bac540f --- /dev/null +++ b/config/replacers/user.yaml @@ -0,0 +1,2 @@ +user: + _target_: src.replacers.user_replacer.UserReplacer \ No newline at end of file diff --git a/config/suppressor/order_based.yaml b/config/suppressor/order_based.yaml new file mode 100644 index 0000000..668896c --- /dev/null +++ b/config/suppressor/order_based.yaml @@ -0,0 +1 @@ +_target_: src.suppressors.order_based.OrderBasedSuppressor \ No newline at end of file diff --git a/main.py b/main.py index 597f4ba..8869ebe 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,7 @@ """Implementation of anonymizer service.""" import argparse import nlp_ws -from src.worker import Worker +from src.worker_old import Worker def get_args(): diff --git a/requirements.txt b/requirements.txt index 3923df9..abceaaa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,7 @@ nlp-ws regex==2020.10.28 Babel==2.8.0 -bitarray==2.6.1 \ No newline at end of file +bitarray==2.6.1 +random-username==1.0.2 +randominfo==2.0.2 +hydra-core==1.3.1 \ No newline at end of file diff --git a/scripts/cli.py b/scripts/cli.py index 9ee5bea..f8986e1 100644 --- a/scripts/cli.py +++ b/scripts/cli.py @@ -1,6 +1,6 @@ """Implementation of anonymizer service.""" import argparse -from src.worker import Worker +from src.worker_old import Worker from src.anonymizers.polish_anonymizer import PolishAnonymizer diff --git a/src/annotation_mapping.py b/src/annotation_mapping.py new file mode 100644 index 0000000..ef73962 --- /dev/null +++ b/src/annotation_mapping.py @@ -0,0 +1,54 @@ +from typing import Dict, List, Tuple, TypeVar + +T1 = TypeVar("T1") +T2 = TypeVar("T2") + +def map_annotatios( + ref_annotations: List[Tuple[int, int, T1]], + all_annotations: Dict[str, List[Tuple[int, int, T2]]], + target_columns: List[str], +) -> Dict[Tuple[int, int, T1], Dict[str, Tuple[int, int, T2]]]: + """Map annotations from target columns to reference annotations. + + Example: + >> ref_annotations = [(0, 3, "Andrzej"), (7, 11, "psa")] + >> all_annotations = { + >> "A": [(0, 3, "Andrzej"), (7, 11, "psa")], + >> "B": [(0, 3, "AndrzejB"), (7, 11, "psaA")], + >> "C": [(0, 3, "AndrzejC"), (8, 9, "psaC")], + >> } + >> target_columns = ["B", "C"] + >> map_annotatios(ref_annotations, all_annotations, target_columns) + { + (0, 3, "Andrzej"): {"B": (0, 3, "AndrzejB"), "C": (0, 3, "AndrzejC")}, + (7, 11, "psa"): { + "B": (7, 11, "psaA"), + }, + } + + Args: + ref_annotations (List[Tuple[int, int, T1]]): Reference annotations. + all_annotations (Dict[str, List[Tuple[int, int, T2]]]): All annotations. + target_columns (List[str]): Target columns. + + Returns: + Dict[Tuple[int, int, T1], Dict[str, Tuple[int, int, T2]]]: Mapped annotations. + """ + + result = dict() + index_map = dict() + + for s_start, s_end, s_anno in ref_annotations: + result[(s_start, s_end, s_anno)] = dict() + index_map[(s_start, s_end)] = (s_start, s_end, s_anno) + + for target_column in target_columns: + for t_start, t_end, t_anno in all_annotations[target_column]: + if (t_start, t_end) in index_map: + result[index_map[(t_start, t_end)]][target_column] = ( + t_start, + t_end, + t_anno, + ) + + return result diff --git a/src/annotations/__init__.py b/src/annotations/__init__.py new file mode 100644 index 0000000..cf65f5a --- /dev/null +++ b/src/annotations/__init__.py @@ -0,0 +1,2 @@ +from src.annotations.annotation import * +from src.annotations.date import * \ No newline at end of file diff --git a/src/annotations/annotation.py b/src/annotations/annotation.py new file mode 100644 index 0000000..69e6f50 --- /dev/null +++ b/src/annotations/annotation.py @@ -0,0 +1,67 @@ +from dataclasses import dataclass +from typing import Optional + +@dataclass +class Annotation: + def __init__(self, type_name: str) -> None: + self._type_name = type_name + + def __hash__(self) -> int: + return tuple(self.__dict__.values()).__hash__() + +class MorphosyntacticInfoMixin: + def __init__(self, morpho_tag: str, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._morpho_tag = morpho_tag + + @property + def morpho_tag(self) -> str: + return self._morpho_tag + +class NameAnnotation(MorphosyntacticInfoMixin, Annotation): + def __init__(self, morpho_tag: Optional[str] = None) -> None: + super().__init__(morpho_tag=morpho_tag, type_name="name") + +class SurnameAnnotation(MorphosyntacticInfoMixin, Annotation): + def __init__(self, morpho_tag: Optional[str] = None) -> None: + super().__init__(morpho_tag=morpho_tag, type_name="surname") + +class StreetNameAnnotation(MorphosyntacticInfoMixin, Annotation): + def __init__(self, morpho_tag: Optional[str] = None) -> None: + super().__init__(morpho_tag=morpho_tag, type_name="street_name") + +class CityAnnotation(MorphosyntacticInfoMixin, Annotation): + def __init__(self, morpho_tag: Optional[str] = None) -> None: + super().__init__(morpho_tag=morpho_tag, type_name="city") + +class CountryAnnotation(MorphosyntacticInfoMixin, Annotation): + def __init__(self, morpho_tag: Optional[str] = None) -> None: + super().__init__(morpho_tag=morpho_tag, type_name="country") + +class PhoneNumberAnnotation(Annotation): + def __init__(self) -> None: + super().__init__("phone_number") + +class UrlAnnotation(Annotation): + def __init__(self) -> None: + super().__init__("url") + +class UserAnnotation(Annotation): + def __init__(self) -> None: + super().__init__("user") + +class EmailAnnotation(Annotation): + def __init__(self) -> None: + super().__init__("email") + +class TINAnnotation(Annotation): # Tax Identification Number + def __init__(self) -> None: + super().__init__("tin") + +class KRSAnnotation(Annotation): # National Court Register + def __init__(self) -> None: + super().__init__("krs") + +class OtherAnnotation(Annotation): # Non standard entity + def __init__(self) -> None: + super().__init__("other") \ No newline at end of file diff --git a/src/annotations/date.py b/src/annotations/date.py new file mode 100644 index 0000000..502004c --- /dev/null +++ b/src/annotations/date.py @@ -0,0 +1,27 @@ +from src.annotations.annotation import Annotation +from typing import List, Tuple, Optional + +class DateAnnotation(Annotation): + class AnnotationPart: + TWO_DIGITS_DAY = "DD" + ONE_DIGIT_DAY = "D" + TWO_DIGIT_MONTH = "MM" + ONE_DIGIT_MONTH = "M" + FOUR_DIGIT_YEAR = "YYYY" + TWO_DIGIT_YEAR = "YY" + TEXT_MONTH = "MMM" + OTHER = "OTHER" + + def __init__(self, format: Optional[List[Tuple[AnnotationPart, str]]] = None) -> None: + """ + The annotation representing a date value. + :param format: the format of the date, e.g. [(AnnotationPart.TWO_DIGITS_DAY, "01"), (AnnotationPart.OTHER, ".") ...] + :type format: Optional[List[Tuple[str, str]]] + """ + + super().__init__("date") + + self.format = format + + def __eq__(self, other) -> bool: + return self.format == other.format and super().__eq__(other) \ No newline at end of file diff --git a/src/base_anonymizer.py b/src/base_anonymizer.py deleted file mode 100644 index a863322..0000000 --- a/src/base_anonymizer.py +++ /dev/null @@ -1,167 +0,0 @@ -"""Abstract description of anonymizer including base regexes.""" -import regex -from abc import ABC, abstractmethod -from src.generators import generate_phone_number_tag - - -regex.compile(r'\B(?P<username>\@[\w\-]+)') -# This regex detects the following - -class BaseAnonymizer(ABC): - """Base abstract class for anonymization.""" - - email_regex = regex.compile( - r'(?P<local_part>[a-z0-9!#$%&\'*+/=?^_`{|}~-]+' - r'(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*@)' - r'(?P<domain>(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+)' - r'(?P<tld>[a-z0-9](?:[a-z0-9-]*[a-z0-9])?)', regex.I - ) - user_regex = regex.compile(r'\B(?P<username>\@[\w\-]+)') - _website_exceptions = ['m.in'] - website_regex = regex.compile( - r'\b(?:{})\b(*SKIP)(*FAIL)|'.format('|'.join(_website_exceptions)) + - r'(?:(?P<protocol>(?:(?:https?|ftp):)?\/\/)?' - r'(?P<auth>\S+(?::\S*)?@)?' - r'(?P<host>(?!(?:10|127)(?:\.\d{1,3}){3})' - r'(?!(?:169\.254|192\.168)(?:\.\d{1,3}){2})' - r'(?!172\.(?:1[6-9]|2\d|3[0-1])(?:\.\d{1,3}){2})' - r'(?:[1-9]\d?|1\d\d|2[01]\d|22[0-3])' - r'(?:\.(?:1?\d{1,2}|2[0-4]\d|25[0-5])){2}' - r'(?:\.(?:[1-9]\d?|1\d\d|2[0-4]\d|25[0-4]))' - r'|' - r'((?:[a-z0-9\u00a1-\uffff][a-z0-9\u00a1-\uffff_-]{0,62})?' - r'[a-z0-9\u00a1-\uffff]\.)+)' - r'(?P<tld>[a-z\u00a1-\uffff]{2,}\.??)' - r'(?P<port>:\d{2,5})?' - r'(?P<path>[/?#]\S*)?)', - regex.UNICODE | regex.I - ) - phone_number_regex = regex.compile( - r'(?P<country_code>(00[1-9]\d?)|(\(?([+\d]{2,3})\)?)[- ]??)?' - r'(?P<number>(\d[- ]??){9,10})' - ) - - def __init__(self, task_options): - """Initialize anonymizer with chosen method and default tokens.""" - self._mail_token = '[MAIL]' - self._user_token = '@[USER]' - self._website_token = '[WWW]' - self._digits_token = '[DIGITS]' - self._date_token = '[DATE]' - self._default_token = '[INNE]' - - self._method = task_options.get('method', 'delete') - - self._category_anonymisation = {} - self._form_dict = {} - self._pseudo_ann_list = [] - - def _process_lex(self, lex_subtree): - tag = '' - for elem in lex_subtree: - if elem.tag == 'ctag': - tag = elem.text - elif elem.tag != 'base': - raise Exception('Unrecognized tag inside lex: ' + elem.tag) - if tag == '': - raise Exception('Lex tag had no ctag inside!') - return tag - - def _tagging(self, sentence): - for category in self._category_anonymisation: - pattern, token, _, _ = self._category_anonymisation[category] - - if category == 'phone_number': - matches = [m for m in pattern.finditer(sentence)] - for match in matches: - tag = generate_phone_number_tag(match.groupdict(''), token) - replace_match = match.group(0) - sentence = regex.sub(regex.escape(replace_match), - tag, sentence) - else: - sentence = regex.sub(pattern, token, sentence) - return sentence - - def _pseudonymization(self, sentence): - sentence_after_regex = sentence - to_replace = [] - for category in self._category_anonymisation: - pattern, _, generator, args = self._category_anonymisation[category] - for match in pattern.finditer(sentence_after_regex): - if not match: - continue - to_replace.append((match, generator, args)) - sentence_after_regex = regex.sub(regex.escape(match.group(0)), - '', sentence_after_regex) - - for match, generator, args in to_replace: - replace_match = match.group(0) - pseudo_string = generator(match.groupdict(''), **args) - sentence = regex.sub( - regex.escape(replace_match), - pseudo_string, - sentence - ) - return sentence - - def _process_ann(self, ann_subtree): - value = int(ann_subtree.text) - chan = ann_subtree.attrib["chan"] - return chan, value - - def _process_single_tok(self, id, tok_subtree): - text = '' - tag = '' - ann = [] - for elem in tok_subtree: - if elem.tag == 'orth': - text = elem.text - elif elem.tag == 'lex': - tag = self._process_lex(elem) - elif elem.tag == 'ann': - ann.append(self._process_ann(elem)) - word = self._process_word(id, text, tag, ann) - return word - - def _process_word(self, id, text, tag, ann): - for chan, value in ann: - if value != 0: - text = self._handle_annotated(id, text, tag, chan) - break - return text - - def _process_sent_tree(self, sentence_subtree): - string_builder = [] - id = 0 - for elem in sentence_subtree: - if elem.tag == 'tok': - tok = self._process_single_tok(id, elem) - string_builder.append(tok) - string_builder.append(' ') - id += 2 - elif elem.tag == 'ns': - id -= 1 - string_builder.pop() - else: - raise Exception('Unrecognized tag inside sentence: ' + elem.tag) - return self._process_sentence(string_builder) - - @abstractmethod - def _handle_annotated(self, id, text, tag, ann): - pass - - @abstractmethod - def _process_sentence(self, string_builder): - pass - - @abstractmethod - def process(self, input_filename, output_filename): - """Anonymize the text in a file input_filename and save the anonymized \ - output text to a file output_filename. - - Args: - input_filename ([type]): [description] - output_filename ([type]): [description] - - """ - pass diff --git a/src/ccl_handler.py b/src/ccl_handler.py deleted file mode 100644 index 99664b1..0000000 --- a/src/ccl_handler.py +++ /dev/null @@ -1,20 +0,0 @@ -"""Implementation of ccl reading functionality.""" -from xml.etree.ElementTree import iterparse - - -class CCLHandler: - """Implements reading ccl for anonymizer service.""" - - def __init__(self, ccl_file_name): - """Initialize CCLHandler with a filename.""" - self._file_name = ccl_file_name - - def process(self, output_filename, unmarshallers): - """Process xml tags using unmarshallers and save in output_file.""" - with open(self._file_name, 'r', encoding='utf-8') as input_file, \ - open(output_filename, 'w', encoding='utf-8') as output_file: - for event, elem in iterparse(input_file): - unmarshal = unmarshallers.get(elem.tag, None) - if unmarshal: - output_file.write(unmarshal(elem)) - elem.clear() diff --git a/src/ccl_parser.py b/src/ccl_parser.py deleted file mode 100644 index 41e6971..0000000 --- a/src/ccl_parser.py +++ /dev/null @@ -1,70 +0,0 @@ -from typing import Dict, Any, List, Tuple -from lxml import etree -from collections import defaultdict - -def parse_ccl(ccl: str) -> Tuple[str, Dict[str, List[Tuple[int, int, str]]]]: - """ - Parses CCL XML format and returns original text and annotations. - - Annotations are returned as a dictionary with keys being annotation channels - and values being a list of tuples (start, end, word) where: - * start is an index of the first character in the word - * end is an index of the last character in the word - * word is a word or a group of words (in case of multiword tokens) - - :param ccl: CCL XML - :return: (text, annotations) - """ - ccl_tree = etree.fromstring(ccl.strip().encode('utf-8')) - - results = defaultdict(list) - text = "" - - # First token is assumed to not have space before it - last_was_ns = True - - tokens = ccl_tree.xpath("//ns | //tok") - for token in tokens: - if token.tag == 'tok': - if not last_was_ns: - text += " " - - word = token.xpath('./orth')[0].text - start = len(text) - end = start + len(word) - - for lex in token.xpath('./lex'): - if lex.attrib['disamb'] == "1": - ctag = lex.xpath('./ctag')[0] - results["ctag"].append((start, end, ctag.text)) - - break - - for ann in token.xpath('./ann'): - is_present = int(ann.text) == 1 - if not is_present: - continue - - channel = ann.attrib['chan'] - is_head = "head" in ann.attrib and ann.attrib['head'] == "1" - - if is_head: - results[channel].append((start, end, word)) - else: - if last_was_ns: - new_word = results[channel][-1][2] + word - else: - new_word = results[channel][-1][2] + " " + word - - old_start = results[channel][-1][0] - - results[channel][-1] = (old_start, end, new_word) - - last_was_ns = False - text += word - elif token.tag == 'ns': - last_was_ns = True - - return text, results - - \ No newline at end of file diff --git a/src/detectors/date/__init__.py b/src/detectors/date/__init__.py index 2c5b35b..45c2819 100644 --- a/src/detectors/date/__init__.py +++ b/src/detectors/date/__init__.py @@ -1 +1 @@ -from src.detectors.date.date import find_dates \ No newline at end of file +from src.detectors.date.date import DateDetector \ No newline at end of file diff --git a/src/detectors/date/date.py b/src/detectors/date/date.py index 2f1f132..b8a8ac2 100644 --- a/src/detectors/date/date.py +++ b/src/detectors/date/date.py @@ -1,23 +1,38 @@ -from typing import List, Tuple +from typing import List, Dict, Any, Tuple from .en import detect_dates_en from .pl import detect_dates_pl from .ru import detect_dates_ru +from src.annotations import Annotation, DateAnnotation +from src.detectors.interface import Detector -def find_dates(text: str, language: str = "en") -> List[Tuple[int, int, str]]: + +class DateDetector(Detector): + def __init__(self, language: str = "pl") -> None: + self._language = language + + def detect( + self, text: str, annotations: Dict[str, List[Tuple[int, int, Any]]] + ) -> List[Tuple[int, int, DateAnnotation]]: + return find_dates(text, self._language) + + +def find_dates( + text: str, language: str = "pl" +) -> List[Tuple[int, int, DateAnnotation]]: """ Finds dates in the text. :param text: the text to be searched :type text: str :param language: the language of the text :type language: str - :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :return: a list of tuples containing (start, end, annotation) + :rtype: List[Tuple[int, int, Annotation]] """ - + language_processors = { "en": detect_dates_en, "pl": detect_dates_pl, - "ru": detect_dates_ru + "ru": detect_dates_ru, } - - return language_processors.get(language, detect_dates_en)(text) \ No newline at end of file + + return language_processors.get(language, detect_dates_en)(text) diff --git a/src/detectors/date/en.py b/src/detectors/date/en.py index a716bc1..5c8467e 100644 --- a/src/detectors/date/en.py +++ b/src/detectors/date/en.py @@ -1,6 +1,8 @@ import regex as re from typing import List, Tuple -from src.entity_types import EntityTypes +from src.annotations import DateAnnotation + +from src.detectors.date.utils import _parse_date_to_format EN_DATES_REGEX = re.compile( r'\b(?P<day_or_month_year>' @@ -22,16 +24,17 @@ EN_DATES_REGEX = re.compile( r'(?<!\b(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\b))', re.I ) -def detect_dates_en(text: str) -> List[Tuple[int, int, str]]: +def detect_dates_en(text: str) -> List[Tuple[int, int, DateAnnotation]]: """ Detects English dates in the text. :param text: the text to be searched :type text: str - :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :return: a list of tuples containing (start, end, annotation) + :rtype: List[Tuple[int, int, DateAnnotation]] """ matches = EN_DATES_REGEX.finditer(text) dates = [] for match in matches: - dates.append((match.start(), match.end(), EntityTypes.DATE)) + format = _parse_date_to_format(match.groupdict()) + dates.append((match.start(), match.end(), DateAnnotation(format))) return dates \ No newline at end of file diff --git a/src/detectors/date/pl.py b/src/detectors/date/pl.py index 02abfdd..a16ac47 100644 --- a/src/detectors/date/pl.py +++ b/src/detectors/date/pl.py @@ -1,6 +1,8 @@ import regex as re from typing import List, Tuple -from src.entity_types import EntityTypes +from src.annotations import DateAnnotation + +from src.detectors.date.utils import _parse_date_to_format PL_DATES_REGEX = re.compile( r'\b(?P<day_or_month_year>' @@ -25,16 +27,18 @@ PL_DATES_REGEX = re.compile( r'(?:(?P<punct6>[ \t\-\./,]{0,2})(?P<year3>\d{4}|\d{2})))?)', re.I ) -def detect_dates_pl(text: str) -> List[Tuple[int, int, str]]: +def detect_dates_pl(text: str) -> List[Tuple[int, int, DateAnnotation]]: """ Detects Polish dates in the text. :param text: the text to be searched :type text: str - :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :return: a list of tuples containing (start, end, annotation) + :rtype: List[Tuple[int, int, DateAnnotation]] """ + matches = PL_DATES_REGEX.finditer(text) dates = [] for match in matches: - dates.append((match.start(), match.end(), EntityTypes.DATE)) + format = _parse_date_to_format(match.groupdict()) + dates.append((match.start(), match.end(), DateAnnotation(format))) return dates \ No newline at end of file diff --git a/src/detectors/date/ru.py b/src/detectors/date/ru.py index 4100717..aacdf2f 100644 --- a/src/detectors/date/ru.py +++ b/src/detectors/date/ru.py @@ -1,6 +1,8 @@ import regex as re from typing import List, Tuple -from src.entity_types import EntityTypes +from src.annotations import DateAnnotation + +from src.detectors.date.utils import _parse_date_to_format RU_DATES_REGEX = re.compile( r'\b(?P<day_or_month_year>' @@ -25,16 +27,18 @@ RU_DATES_REGEX = re.compile( r'(?<!\b(Янв|Фев|Мар|Ðпр|Май|Июн|Июл|Ðвг|Сен|Окт|ÐоÑ|Дек)\b))', re.I ) -def detect_dates_ru(text: str) -> List[Tuple[int, int, str]]: +def detect_dates_ru(text: str) -> List[Tuple[int, int, DateAnnotation]]: """ Detects Russian dates in the text. :param text: the text to be searched :type text: str - :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :return: a list of tuples containing (start, end, annotation) + :rtype: List[Tuple[int, int, DateAnnotation]] """ matches = RU_DATES_REGEX.finditer(text) dates = [] for match in matches: - dates.append((match.start(), match.end(), EntityTypes.DATE)) + format = _parse_date_to_format(match.groupdict()) + dates.append((match.start(), match.end(), DateAnnotation(format))) + return dates \ No newline at end of file diff --git a/src/detectors/date/utils.py b/src/detectors/date/utils.py new file mode 100644 index 0000000..9d7bab2 --- /dev/null +++ b/src/detectors/date/utils.py @@ -0,0 +1,111 @@ +from typing import List, Tuple +from src.annotations import DateAnnotation, Optional + +def _parse_day_or_month(re_entry) -> List[Tuple[int, int, DateAnnotation]]: + assert re_entry["day_or_month_year"] is not None + result = [] + + if re_entry["day_month1"] is not None: + if len(re_entry["day_month1"]) == 1: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "0" + re_entry["day_month1"])) + else: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, re_entry["day_month1"])) + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct1"])) + + if len(re_entry["day_month2"]) == 1: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "0" + re_entry["day_month2"])) + else: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, re_entry["day_month2"])) + + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct1"])) + elif "day_month2" in re_entry: + if len(re_entry["day_month2"]) == 1: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "0" + re_entry["day_month2"])) + else: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, re_entry["day_month2"])) + + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct1"])) + + if "year1" in re_entry: + if len(re_entry["year1"]) == 2: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_YEAR, re_entry["year1"])) + else: + result.append((DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, re_entry["year1"])) + + return result + +def _parse_year_month_or_day(re_entry) -> List[Tuple[int, int, DateAnnotation]]: + assert re_entry["year_month_or_day"] is not None + result = [] + + if "year2" in re_entry: + if len(re_entry["year2"]) == 2: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_YEAR, re_entry["year2"])) + else: + result.append((DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, re_entry["year2"])) + + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct3"])) + + if "day_month3" in re_entry: + if len(re_entry["day_month3"]) == 1: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "0" + re_entry["day_month3"])) + else: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, re_entry["day_month3"])) + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct4"])) + + if "day_month4" in re_entry: + if len(re_entry["day_month4"]) == 1: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "0" + re_entry["day_month4"])) + else: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, re_entry["day_month4"])) + + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct4"])) + + return result + +def _parse_month_in_words(re_entry) -> List[Tuple[DateAnnotation.AnnotationPart, str]]: + assert re_entry["month_in_words"] is not None + result = [] + + if re_entry["day1"] is not None: + if len(re_entry["day1"]) == 1: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "0" + re_entry["day1"])) + else: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, re_entry["day1"])) + + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct5"])) + + if re_entry["month"] is not None: + result.append((DateAnnotation.AnnotationPart.TEXT_MONTH, re_entry["month"])) + + if re_entry["day1"] is None: + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct7"])) + else: + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct6"])) + + if re_entry["day2"] is not None: + if len(re_entry["day2"]) == 1: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "0" + re_entry["day2"])) + else: + result.append((DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, re_entry["day2"])) + result.append((DateAnnotation.AnnotationPart.OTHER, re_entry["punct6"])) + + if re_entry["year3"] is not None: + if len(re_entry["year3"]) == 2: + result.append((DateAnnotation.AnnotationPart.TWO_DIGIT_YEAR, re_entry["year3"])) + else: + result.append((DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, re_entry["year3"])) + + return result + +def _parse_date_to_format(re_entry) -> Optional[List[Tuple[DateAnnotation.AnnotationPart, str]]]: + if re_entry["day_or_month_year"] is not None: + result = _parse_day_or_month(re_entry) + elif re_entry["year_month_or_day"] is not None: + result = _parse_year_month_or_day(re_entry) + elif re_entry["month_in_words"] is not None: + result = _parse_month_in_words(re_entry) + else: + result = None + + return result \ No newline at end of file diff --git a/src/detectors/email/__init__.py b/src/detectors/email/__init__.py index 58050bc..524f295 100644 --- a/src/detectors/email/__init__.py +++ b/src/detectors/email/__init__.py @@ -1 +1 @@ -from src.detectors.email.email import detect_emails \ No newline at end of file +from src.detectors.email.email import EmailDetector \ No newline at end of file diff --git a/src/detectors/email/email.py b/src/detectors/email/email.py index 82e1756..2e0075d 100644 --- a/src/detectors/email/email.py +++ b/src/detectors/email/email.py @@ -1,16 +1,29 @@ import regex as re -from typing import List, Tuple -from src.entity_types import EntityTypes +from typing import List, Dict, Any, Tuple +from src.annotations import EmailAnnotation +from src.detectors.interface import Detector + + +class EmailDetector(Detector): + def __init__(self) -> None: + super().__init__() + + def detect( + self, text: str, annotations: Dict[str, List[Tuple[int, int, Any]]] + ) -> List[Tuple[int, int, EmailAnnotation]]: + return detect_emails(text) + EMAIL_REGEX = re.compile( - r'(?P<local_part>[a-z0-9!#$%&\'*+/=?^_`{|}~-]+' - r'(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*@)' - r'(?P<domain>(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+)' - r'(?P<tld>[a-z0-9](?:[a-z0-9-]*[a-z0-9])?)', re.I + r"(?P<local_part>[a-z0-9!#$%&\'*+/=?^_`{|}~-]+" + r"(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*@)" + r"(?P<domain>(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+)" + r"(?P<tld>[a-z0-9](?:[a-z0-9-]*[a-z0-9])?)", + re.I, ) -def detect_emails(text: str, language: str) -> List[Tuple[int, int, str]]: +def detect_emails(text: str) -> List[Tuple[int, int, EmailAnnotation]]: """ Detects emails in the text. :param text: the text to be searched @@ -18,10 +31,10 @@ def detect_emails(text: str, language: str) -> List[Tuple[int, int, str]]: :param language: the language of the text :type language: str :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :rtype: List[Tuple[int, int, EmailAnnotation]] """ matches = EMAIL_REGEX.finditer(text) emails = [] for match in matches: - emails.append((match.start(), match.end(), EntityTypes.EMAIL)) - return emails \ No newline at end of file + emails.append((match.start(), match.end(), EmailAnnotation())) + return emails diff --git a/src/detectors/interface.py b/src/detectors/interface.py new file mode 100644 index 0000000..b32cd06 --- /dev/null +++ b/src/detectors/interface.py @@ -0,0 +1,9 @@ +from typing import List, Dict, Any, Tuple +from src.annotations import Annotation + + +class Detector: + def detect( + self, text: str, annotations: Dict[str, List[Tuple[int, int, Any]]] + ) -> List[Tuple[int, int, Annotation]]: + raise NotImplementedError diff --git a/src/detectors/ner/__init__.py b/src/detectors/ner/__init__.py index 9f8aefd..aeb2747 100644 --- a/src/detectors/ner/__init__.py +++ b/src/detectors/ner/__init__.py @@ -1 +1 @@ -from src.detectors.ner.ner import detect_ner \ No newline at end of file +from src.detectors.ner.ner import NerDetector \ No newline at end of file diff --git a/src/detectors/ner/ner.py b/src/detectors/ner/ner.py index 18c5622..e5611c2 100644 --- a/src/detectors/ner/ner.py +++ b/src/detectors/ner/ner.py @@ -1,8 +1,23 @@ -from typing import List, Tuple +from typing import List, Dict, Any, Tuple from src.detectors.ner.pl_liner_n5 import detect_ner_pl_liner_n5 +from src.detectors.interface import Detector +from src.annotations import Annotation -def detect_ner(ccl_annotations, language) -> List[Tuple[int, int, str]]: - if language == 'pl': + +class NerDetector(Detector): + def __init__(self, language: str = "pl") -> None: + self._language = language + + def detect( + self, text: str, annotations: Dict[str, List[Tuple[int, int, Any]]] + ) -> List[Tuple[int, int, str]]: + return detect_ner(annotations, self._language) + + +def detect_ner( + ccl_annotations: Dict[str, List[Tuple[int, int, Annotation]]], language: str +) -> List[Tuple[int, int, str]]: + if language == "pl": return detect_ner_pl_liner_n5(ccl_annotations) else: - raise NotImplementedError \ No newline at end of file + raise NotImplementedError(f"Language {language} is not supported.") diff --git a/src/detectors/ner/pl_liner_n5.py b/src/detectors/ner/pl_liner_n5.py index c494d13..0ea13a9 100644 --- a/src/detectors/ner/pl_liner_n5.py +++ b/src/detectors/ner/pl_liner_n5.py @@ -1,10 +1,10 @@ from typing import List, Tuple, Dict from src.utils.utils import subdict -from src.entity_types import EntityTypes -from src.utils.ner_pl_n5_mapping import NER_PL_N5_MAPPING +from src.annotations import OtherAnnotation, Annotation +from src.mappings.ner_pl_n5_mapping import NER_PL_N5_MAPPING def detect_ner_pl_liner_n5( - ccl_annotations: Dict[str, List[Tuple[int, int, str]]] + ccl_annotations: Dict[str, List[Tuple[int, int, Annotation]]] ) -> List[Tuple[int, int, str]]: """ Detects ner entities in the text based on liner_n5 NER ontology. @@ -12,22 +12,16 @@ def detect_ner_pl_liner_n5( :param ner_annotations: a dictionary of NER annotations :type ner_annotations: Dict[str, List[Tuple[int, int, str]]] :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :rtype: List[Tuple[int, int, Annotation]] """ names = subdict( ccl_annotations, - [ - "nam_liv_person", - "nam_liv_person_last", - "nam_fac_road", - "nam_loc_gpe_city", - "nam_org_group_team", - ], + list(NER_PL_N5_MAPPING.keys()), all_must_be_present=False, ) return [ - (start, end, NER_PL_N5_MAPPING.get(entity_type, EntityTypes.OTHER)) + (start, end, NER_PL_N5_MAPPING.get(entity_type, OtherAnnotation)()) for entity_type, entity in names.items() for start, end, _ in entity ] diff --git a/src/detectors/phone/__init__.py b/src/detectors/phone/__init__.py index e30518d..4de9ace 100644 --- a/src/detectors/phone/__init__.py +++ b/src/detectors/phone/__init__.py @@ -1 +1 @@ -from src.detectors.phone.phone import detect_phone_numbers \ No newline at end of file +from src.detectors.phone.phone import PhoneNumberDetector \ No newline at end of file diff --git a/src/detectors/phone/phone.py b/src/detectors/phone/phone.py index 8ab3d65..ca88264 100644 --- a/src/detectors/phone/phone.py +++ b/src/detectors/phone/phone.py @@ -1,14 +1,26 @@ import regex as re -from typing import List, Tuple -from src.entity_types import EntityTypes +from typing import List, Dict, Any, Tuple +from src.annotations import PhoneNumberAnnotation +from src.detectors.interface import Detector + + +class PhoneNumberDetector(Detector): + def __init__(self) -> None: + super().__init__() + + def detect( + self, text: str, annotations: Dict[str, List[Tuple[int, int, Any]]] + ) -> List[Tuple[int, int, PhoneNumberAnnotation]]: + return detect_phone_numbers(text) + PHONE_NUMBER_REGEX = re.compile( - r'(?P<country_code>(00[1-9]\d?)|(\(?([+\d]{2,3})\)?)[- ]??)?' - r'(?P<number>(\d[- ]??){9,10})' + r"(?P<country_code>(00[1-9]\d?)|(\(?([+\d]{2,3})\)?)[- ]??)?" + r"(?P<number>(\d[- ]??){9,10})" ) -def detect_phone_numbers(text: str, language: str) -> List[Tuple[int, int, str]]: +def detect_phone_numbers(text: str) -> List[Tuple[int, int, PhoneNumberAnnotation]]: """ Detects phone numbers in the text. :param text: the text to be searched @@ -16,10 +28,10 @@ def detect_phone_numbers(text: str, language: str) -> List[Tuple[int, int, str]] :param language: the language of the text :type language: str :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :rtype: List[Tuple[int, int, PhoneNumberAnnotation]] """ matches = PHONE_NUMBER_REGEX.finditer(text) phone_numbers = [] for match in matches: - phone_numbers.append((match.start(), match.end(), EntityTypes.PHONE_NUMBER)) - return phone_numbers \ No newline at end of file + phone_numbers.append((match.start(), match.end(), PhoneNumberAnnotation())) + return phone_numbers diff --git a/src/detectors/url/__init__.py b/src/detectors/url/__init__.py index 72b8dc6..5fa84d2 100644 --- a/src/detectors/url/__init__.py +++ b/src/detectors/url/__init__.py @@ -1 +1 @@ -from src.detectors.url.url import detect_urls +from src.detectors.url.url import UrlDetector diff --git a/src/detectors/url/url.py b/src/detectors/url/url.py index 70b8ba8..63c83db 100644 --- a/src/detectors/url/url.py +++ b/src/detectors/url/url.py @@ -1,10 +1,22 @@ import regex as re -from typing import List, Tuple +from typing import List, Dict, Any, Tuple from .pl import URL_REGEX_PL from .common import generate_url_regex -from src.entity_types import EntityTypes +from src.annotations import UrlAnnotation +from src.detectors.interface import Detector -def detect_urls(text: str, language: str) -> List[Tuple[int, int, str]]: + +class UrlDetector(Detector): + def __init__(self, language: str = "pl") -> None: + self._language = language + + def detect( + self, text: str, annotations: Dict[str, List[Tuple[int, int, Any]]] + ) -> List[Tuple[int, int, UrlAnnotation]]: + return detect_urls(text, self._language) + + +def detect_urls(text: str, language: str) -> List[Tuple[int, int, UrlAnnotation]]: """ Detects urls in the text. :param text: the text to be searched @@ -12,16 +24,16 @@ def detect_urls(text: str, language: str) -> List[Tuple[int, int, str]]: :param language: the language of the text :type language: str :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :rtype: List[Tuple[int, int, UrlAnnotation]] """ if language == "pl": url_regex = URL_REGEX_PL else: url_regex = generate_url_regex(language) - + matches = url_regex.finditer(text) urls = [] for match in matches: - urls.append((match.start(), match.end(), EntityTypes.URL)) - - return urls \ No newline at end of file + urls.append((match.start(), match.end(), UrlAnnotation())) + + return urls diff --git a/src/detectors/user/__init__.py b/src/detectors/user/__init__.py index 3ba0c10..62039d3 100644 --- a/src/detectors/user/__init__.py +++ b/src/detectors/user/__init__.py @@ -1 +1 @@ -from src.detectors.user.user import detect_users \ No newline at end of file +from src.detectors.user.user import UserDetector \ No newline at end of file diff --git a/src/detectors/user/user.py b/src/detectors/user/user.py index d588a25..ca8d483 100644 --- a/src/detectors/user/user.py +++ b/src/detectors/user/user.py @@ -1,10 +1,23 @@ import regex as re -from typing import List, Tuple -from src.entity_types import EntityTypes +from typing import List, Dict, Any, Tuple +from src.annotations import UserAnnotation +from src.detectors.interface import Detector -USER_REGEX = re.compile(r'\B(?P<username>\@[\w\-]+)') -def detect_users(text: str, language: str) -> List[Tuple[int, int, str]]: +class UserDetector(Detector): + def __init__(self) -> None: + super().__init__() + + def detect( + self, text: str, annotations: Dict[str, List[Tuple[int, int, Any]]] + ) -> List[Tuple[int, int, UserAnnotation]]: + return detect_users(text) + + +USER_REGEX = re.compile(r"\B(?P<username>\@[\w\-]+)") + + +def detect_users(text: str) -> List[Tuple[int, int, UserAnnotation]]: """ Detects users in the text. :param text: the text to be searched @@ -12,10 +25,10 @@ def detect_users(text: str, language: str) -> List[Tuple[int, int, str]]: :param language: the language of the text :type language: str :return: a list of tuples containing (start, end, entity_type) - :rtype: List[Tuple[int, int, str]] + :rtype: List[Tuple[int, int, UserAnnotation]] """ matches = USER_REGEX.finditer(text) users = [] for match in matches: - users.append((match.start(), match.end(), EntityTypes.USER)) - return users \ No newline at end of file + users.append((match.start(), match.end(), UserAnnotation())) + return users diff --git a/src/dictionaries/morphosyntactic/__init__.py b/src/dictionaries/morphosyntactic/__init__.py new file mode 100644 index 0000000..edce674 --- /dev/null +++ b/src/dictionaries/morphosyntactic/__init__.py @@ -0,0 +1 @@ +from src.dictionaries.morphosyntactic.interface import MorphosyntacticDictionary \ No newline at end of file diff --git a/src/dictionaries/morphosyntactic/interface.py b/src/dictionaries/morphosyntactic/interface.py new file mode 100644 index 0000000..fe4a938 --- /dev/null +++ b/src/dictionaries/morphosyntactic/interface.py @@ -0,0 +1,10 @@ +from src.annotations import Annotation +from typing import Optional + +class MorphosyntacticDictionary: + def get_random_replacement(self, original_entry: Annotation) -> Optional[str]: + """ + Returns a random replacement for the original entry + """ + raise NotImplementedError() + \ No newline at end of file diff --git a/src/dictionaries/morphosyntactic/pl_ner.py b/src/dictionaries/morphosyntactic/pl_ner.py new file mode 100644 index 0000000..3f861a6 --- /dev/null +++ b/src/dictionaries/morphosyntactic/pl_ner.py @@ -0,0 +1,110 @@ +from typing import Dict, List, Optional, Tuple, Type +from collections import defaultdict +from src.annotations import Annotation, OtherAnnotation, MorphosyntacticInfoMixin +from src.dictionaries.morphosyntactic.interface import MorphosyntacticDictionary +import random + + +class PlNERMorphosyntacticDictionary(MorphosyntacticDictionary): + def __init__( + self, + dictionary_path: Optional[str] = None, + annotation_mapping: Optional[Dict[str, Type[Annotation]]] = None, + list: Optional[List[Tuple[Annotation, str, str, str]]] = None, + always_replace=True, + ) -> None: + super().__init__() + self._dictionary = None + self._always_replace = always_replace + + if dictionary_path is not None: + assert annotation_mapping is not None + self._from_file(dictionary_path, annotation_mapping) + elif list is not None: + self._from_list(list) + else: + raise ValueError("Either dictionary_path or list must be provided.") + + def _from_file( + self, path_to_dictionary: str, annotation_mapping: Dict[str, Type[Annotation]] + ) -> None: + self._dictionary = load_pl_ner_replacements_dictionary( + path_to_dictionary, annotation_mapping + ) + + def _from_list(self, list: List[Tuple[Annotation, str, str, str]]) -> None: + self._dictionary = defaultdict(lambda: defaultdict(dict)) + for annotation, word, lemma, morpho_tag in list: + self._dictionary[annotation][morpho_tag][lemma] = word + + def get_random_replacement(self, original_entry: Annotation) -> Optional[str]: + original_entry_type = type(original_entry) + + result = None + + if issubclass(original_entry_type, MorphosyntacticInfoMixin): + morpho_tag = original_entry.morpho_tag + + if ( + original_entry_type in self._dictionary + and morpho_tag in self._dictionary[original_entry_type] + ): + result = random.choice( + list(self._dictionary[original_entry_type][morpho_tag].values()) + ) + + if result is None and self._always_replace: + random_type = random.choice(list(self._dictionary.keys())) + random_tag = random.choice(list(self._dictionary[random_type].keys())) + + result = random.choice( + list(self._dictionary[random_type][random_tag].values()) + ) + + return result + + +def load_pl_ner_replacements_dictionary( + path: str, ner_mapping: Optional[Dict[str, Type[Annotation]]] = None +) -> Dict[str, Dict[str, Dict[str, str]]]: + """ + Loads a dictionary that maps named entity tags to lemmas to part-of-speech tags to words. + + The dictionary is a nested defaultdict, so if a key is not found, an empty defaultdict is returned. + + The dictionary is stored in a tab-separated file, where each line has the following format: + + <ner_tag> <word> <lemma> <pos_tag> + + Example: + + OSOBA Andrzejowi Andrzej subst:sg:dat:m1 + OSOBA Andrzej Andrzej subst:sg:m1:imperf + OSOBA Kasia Kasia subst:sg:f:imperf + MIEJSCE WrocÅ‚aw WrocÅ‚aw subst:sg:m2:imperf + MIEJSCE Warszawa Warszawa subst:sg:f:imperf + MIEJSCE Kraków Kraków subst:sg:m2:imperf + + Parameters + ---------- + path : str + Path to the dictionary file. + + Returns + ------- + Dict[str, Dict[str, Dict[str, str]]] + Nested defaultdict that maps named entity tags to lemmas to part-of-speech tags to words. + """ + + replacement_dictionary = defaultdict(lambda: defaultdict(dict)) + with open(path, "r", encoding="utf-8") as file: + for line in file: + line = line.strip() + ner_tag, word, lemma, morpho_tag = line.split("\t") + + if ner_mapping is not None: + ner_tag = ner_mapping.get(ner_tag, OtherAnnotation)() + + replacement_dictionary[ner_tag][morpho_tag][lemma] = word + + return replacement_dictionary diff --git a/src/dictionaries/pl_ner_replacements.py b/src/dictionaries/pl_ner_replacements.py deleted file mode 100644 index 77e7e87..0000000 --- a/src/dictionaries/pl_ner_replacements.py +++ /dev/null @@ -1,46 +0,0 @@ -from typing import Dict, List, Optional -from collections import defaultdict -from src.entity_types import EntityTypes - -def load_pl_ner_replacements_dictionary(path: str, ner_mapping: Optional[Dict[str, str]] = None) -> Dict[str, Dict[str, Dict[str, str]]]: - """ - Loads a dictionary that maps named entity tags to lemmas to part-of-speech tags to words. - - The dictionary is a nested defaultdict, so if a key is not found, an empty defaultdict is returned. - - The dictionary is stored in a tab-separated file, where each line has the following format: - - <ner_tag> <word> <lemma> <pos_tag> - - Example: - - OSOBA Andrzejowi Andrzej subst:sg:dat:m1 - OSOBA Andrzej Andrzej subst:sg:m1:imperf - OSOBA Kasia Kasia subst:sg:f:imperf - MIEJSCE WrocÅ‚aw WrocÅ‚aw subst:sg:m2:imperf - MIEJSCE Warszawa Warszawa subst:sg:f:imperf - MIEJSCE Kraków Kraków subst:sg:m2:imperf - - Parameters - ---------- - path : str - Path to the dictionary file. - - Returns - ------- - Dict[str, Dict[str, Dict[str, str]]] - Nested defaultdict that maps named entity tags to lemmas to part-of-speech tags to words. - """ - - replacement_dictionary = defaultdict(lambda: defaultdict(dict)) - with open(path, "r", encoding="utf-8") as file: - for line in file: - line = line.strip() - ner_tag, word, lemma, pos_tag = line.split("\t") - - if ner_mapping is not None: - ner_tag = ner_mapping.get(ner_tag, EntityTypes.OTHER) - - replacement_dictionary[ner_tag][lemma][pos_tag] = word - - return replacement_dictionary \ No newline at end of file diff --git a/src/entity_types.py b/src/entity_types.py deleted file mode 100644 index ed0496b..0000000 --- a/src/entity_types.py +++ /dev/null @@ -1,14 +0,0 @@ -class EntityTypes: - NAME = "name" - SURNAME = "surname" - STREET_NAME = "street_name" - CITY = "city" - COUNTRY = "country" - PHONE_NUMBER = "phone_number" - URL = "url" - USER = "user" - EMAIL = "email" - DATE = "date" - TIN = "tin" # Tax Identification Number - KRS = "krs" # National Court Register - OTHER = "other" \ No newline at end of file diff --git a/src/generators.py b/src/generators.py deleted file mode 100644 index 606aabc..0000000 --- a/src/generators.py +++ /dev/null @@ -1,314 +0,0 @@ -"""Implementation of pseudonimization for different token categories.""" -import re -import random -import calendar -from string import punctuation, ascii_lowercase, ascii_uppercase, digits -from datetime import datetime -from babel import Locale - - -def get_random_character(digit: bool = False, upper: bool = False): - """Generate random character. - - Args: - digit (bool): Return random single digit. - upper (bool): Return uppercase character. - - """ - return random.choice(digits) if digit \ - else random.choice(ascii_uppercase) \ - if upper else random.choice(ascii_lowercase) - - -def pseudonymize_string(sentence: str, leave_chars: str = ''): - """Change characters in string. - - Uppercase character for uppercase, lowercase for lowercase, digit for digit. - - Args: - sentence (str): Sentence to pseudonimize. - leave_chars (str): Characters that should remain unchanged e.g ' -()'. - - """ - if not sentence: - return '' - pseudonymized = '' - for char in sentence: - if char in leave_chars: - pseudonymized += char - else: - pseudonymized += get_random_character( - char.isdigit(), - char.isupper()) - return pseudonymized - - -def generate_pseudo_email(email_match: str): - """Generate pseudonimized email based on matched email in text. - - Args: - email_match: Matched email. - - """ - local_part = email_match['local_part'] - domain = email_match['domain'] - top_level_domain = email_match['tld'] - new_email = pseudonymize_string(local_part, punctuation + '@') - new_email += pseudonymize_string(domain, punctuation) - return new_email + top_level_domain - - -def generate_pseudo_user(user_match): - """Generate pseudonimized user based on matched user in text. - - Args: - user_match: Matched user. - - """ - username = user_match['username'][1:] - new_username = pseudonymize_string(username) - return '@' + new_username - - -def generate_pseudo_website(website_match): - """Generate pseudonimized website based on matched website in text. - - Args: - website_match: Matched website. - - """ - protocol = website_match['protocol'] - auth = website_match['auth'] - host = website_match['host'] - top_level_domain = website_match['tld'] - port = website_match['port'] - path = website_match['path'] - new_website = protocol - new_website += pseudonymize_string(auth, punctuation) - new_website += host - new_website += top_level_domain - new_website += pseudonymize_string(port, punctuation) - new_website += pseudonymize_string(path, punctuation) - return new_website - - -def generate_pseudo_phone_number(number_match): - """Generate pseudonimized phone number based on matched phone number in text. - - Args: - number_match: Matched phone number string. - - """ - country_code = number_match['country_code'] - phone_number = number_match['number'] - new_phone_number = country_code + \ - pseudonymize_string(phone_number, [' ', '-']) - return new_phone_number - - -def generate_phone_number_tag(number_match, default_token): - """Generate tag for every splitted set of digits. - - Delimiters in phone number: '-', ' ' - e.g 123 456-789 -> [TOKEN] [TOKEN]-[TOKEN] - - Args: - number_match: Matched phone number string. - default_token (str): Token that should replace digits. - - """ - splitted_number = re.split('([- ])', ''.join(number_match.values())) - new_number = '' - for part in splitted_number: - if part in [' ', '-']: - new_number += part - else: - new_number += default_token - return ''.join(new_number) - - -def random_year(year_match): - """Generate random year. - - Generate random year based on the number of digits in year match. - Prefer years close to an actual year with a fixed probability. - - Args: - year_match: Year in date match. - - """ - if not year_match: - return '' - popular_years_probability = 0.8 - actual_year = datetime.now().year - if len(year_match) == 2: - if random.random() < popular_years_probability: - year = "{:02d}".format(random.randint(actual_year - 40, - actual_year + 5) % 100) - else: - year = "{:02d}".format(random.randint(0, 99)) - else: - if random.random() < popular_years_probability: - year = random.randint(actual_year - 100, - actual_year + 10) - else: - year = random.randint(1000, datetime.now().year + 100) - return str(year) - - -def random_day(month, year): - """Generate random day. - - Generate random day in the month and year previously drawn. - - Args: - month: The month in which the day will be drawn. - year: The year in which the day will be drawn. - - """ - if not year: - year = datetime.now().year - month = int(month) - year = int(year) - dates = calendar.Calendar().itermonthdates(year, month) - return random.choice([date.day for date in dates if date.month == month]) - - -def random_date(day_no_digits: int, month_no_digits: int, year_match): - """Generate random date. - - Generate random day based on the number of digits in day and month - and also matched year. - - Args: - day_no_digits (int): The number of digits in day match. - month_no_digits (int): The number of digits in month match. - year_match: Year in date match. - - """ - year = random_year(year_match) - - month = random.randint(1, 12) if month_no_digits == 2 \ - else random.randint(1, 9) - month = f'{month:02}' if month_no_digits == 2 else str(month) - day = random_day(month, year) if day_no_digits == 2 \ - else random.randint(1, 9) - day = f'{day:02}' if day_no_digits == 2 else str(day) - - return day, month, year - - -def month_number2text(month_number: int, abbr: bool, case: str = 'genitive', - lang='pl'): - """Return the name of the month in words. - - Generate the month name from its number. - The method could return the abbreviation form and name in the nominative - or genitive case. - - Args: - month_number (int): Number of the month. - abbr (bool): Return abbreviation form. - case (str): Return the name of the month in the given case. - lang (str): The language which is used to generate text. - - """ - locale = Locale(lang) - if case == 'genitive': - months = locale.months['format'] - elif case == 'nominative': - months = locale.months['stand-alone'] - else: - months = locale.months['format'] - - if abbr: - months = months['abbreviated'] - else: - months = months['wide'] - - return months[month_number] - - -def generate_pseudo_date(date_match, lang='pl'): - """Pseudonymize matched date. - - Generate the pseudonymized based on matched data in text. - This method will return the date in the format day-month-year - or year-month-day if the second number in date match is smaller than 13. - Otherwise, the position of the day and month will be swapped. - - Args: - date_match: Matched date. - lang: The language which is used to generate the date. - - """ - date = '' - if date_match['day_or_month_year']: - no_digits = (len(date_match['day_month1']), - len(date_match['day_month2'])) - if int(date_match['day_month2']) > 12: - no_digits = (len(date_match['day_month2']), - len(date_match['day_month1'])) - day, month, year = random_date(no_digits[0], no_digits[1], - date_match['year1']) - - date_order = [day, date_match['punct1'], - month, date_match['punct2'], year] - if int(date_match['day_month2']) > 12: - date_order[0], date_order[2] = date_order[2], date_order[0] - date = ''.join(date_order) - elif date_match['year_month_or_day']: - no_digits = (len(date_match['day_month4']), - len(date_match['day_month3'])) - if int(date_match['day_month3']) > 12: - no_digits = (len(date_match['day_month3']), - len(date_match['day_month4'])) - day, month, year = random_date(no_digits[0], no_digits[1], - date_match['year2']) - - date_order = [year, date_match['punct3'], month, - date_match['punct4'], day] - if int(date_match['day_month3']) > 12: - date_order[2], date_order[4] = date_order[4], date_order[2] - date = ''.join(date_order) - elif date_match['month_in_words']: - if date_match['day1']: - day_len = len(date_match['day1']) - elif date_match['day2']: - day_len = len(date_match['day2']) - else: - day_len = 0 - - if date_match['year3']: - year_match = date_match['year3'] - elif lang != 'en' and date_match['year4']: - year_match = date_match['year4'] - else: - year_match = '' - day, month, year = random_date(day_len, 2, year_match) - - abbr = len(date_match['month']) == 3 - locale = Locale(lang) - if date_match['month'] in locale.months['format']['wide'].values(): - case = 'genitive' - else: - case = 'nominative' - month = month_number2text(int(month), abbr, case, lang=lang) - - if date_match['day1']: - date_order = [day, date_match['punct5'], - month, date_match['punct6']] - elif date_match['day2'] and lang == 'en': - date_order = [month, date_match['punct7'], - day, date_match['punct6']] - elif date_match['day2']: - date_order = [month, date_match['punct7'], - day, date_match['punct8']] - else: - date_order = [month] - if date_match['year3'] or (lang != 'en' and date_match['year4']): - date_order += [year] - date = ''.join(date_order) - else: - date = '' - return date diff --git a/src/input_parsers/__init__.py b/src/input_parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/input_parsers/ccl.py b/src/input_parsers/ccl.py new file mode 100644 index 0000000..7b8bb7e --- /dev/null +++ b/src/input_parsers/ccl.py @@ -0,0 +1,75 @@ +from typing import Dict, List, Tuple +from lxml import etree +from collections import defaultdict +# from src.annotation_types_old import +from src.input_parsers.interface import InputParser + +class CCLInputParser(InputParser): + def __init__(self) -> None: + super().__init__() + + def parse(self, path_to_input: str) -> Tuple[str, Dict[str, List[Tuple[int, int, str]]]]: + """Parse CCL string into text and annotations. + + Annotations are returned as a dictionary with channel name as a key and list of tuples. + + Args: + path_to_input (str): Path to file containing CCL. + + Returns: + Tuple[str, Dict[str, List[Tuple[int, int, str]]]]: Text and annotations. + """ + with open(path_to_input, 'r') as f: + ccl = f.read() + + ccl_tree = etree.fromstring(ccl.strip().encode('utf-8')) + + results = defaultdict(list) + text = "" + + # First token is assumed to not have space before it + last_was_ns = True + + tokens = ccl_tree.xpath("//ns | //tok") + for token in tokens: + if token.tag == 'tok': + if not last_was_ns: + text += " " + + word = token.xpath('./orth')[0].text + start = len(text) + end = start + len(word) + + for lex in token.xpath('./lex'): + if lex.attrib['disamb'] == "1": + ctag = lex.xpath('./ctag')[0] + # results[AnnotationTypes.MORPHOSYNTACTIC_TAG].append((start, end, ctag.text)) + + break + + for ann in token.xpath('./ann'): + is_present = int(ann.text) == 1 + if not is_present: + continue + + channel = ann.attrib['chan'] + is_head = "head" in ann.attrib and ann.attrib['head'] == "1" + + if is_head: + results[channel].append((start, end, word)) + else: + if last_was_ns: + new_word = results[channel][-1][2] + word + else: + new_word = results[channel][-1][2] + " " + word + + old_start = results[channel][-1][0] + + results[channel][-1] = (old_start, end, new_word) + + last_was_ns = False + text += word + elif token.tag == 'ns': + last_was_ns = True + + return text, results \ No newline at end of file diff --git a/src/input_parsers/interface.py b/src/input_parsers/interface.py new file mode 100644 index 0000000..e6c2891 --- /dev/null +++ b/src/input_parsers/interface.py @@ -0,0 +1,17 @@ +from typing import Dict, List, Tuple, Any + +class InputParser: + def parse(self, path_to_input: str) -> Tuple[str, Dict[str, List[Tuple[int, int, Any]]]]: + """Parse input string into text and annotations. + + Annotations are returned as a dictionary with channel name as a key and list of tuples. + Eg.: "She has a cat" -> ("She has a cat", {"entities": [(0, 3, "She"), (8, 11, "cat")]}) + + Args: + path_to_input (str): Path to file containing input. + + Returns: + Tuple[str, Dict[str, List[Tuple[int, int, Any]]]]: Text and annotations. + """ + pass + diff --git a/src/mappings/__init__.py b/src/mappings/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mappings/ner_pl_n5_mapping.py b/src/mappings/ner_pl_n5_mapping.py new file mode 100644 index 0000000..a14d9ce --- /dev/null +++ b/src/mappings/ner_pl_n5_mapping.py @@ -0,0 +1,15 @@ +from src.annotations import ( + NameAnnotation, + SurnameAnnotation, + StreetNameAnnotation, + CityAnnotation, + CountryAnnotation, +) + +NER_PL_N5_MAPPING = { + "person_first_nam": NameAnnotation, + "person_last_nam": SurnameAnnotation, + "road_nam": StreetNameAnnotation, + "city_nam": CityAnnotation, + "country_nam": CountryAnnotation, +} diff --git a/src/pipeline/__init__.py b/src/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pipeline/default.py b/src/pipeline/default.py new file mode 100644 index 0000000..59b8d69 --- /dev/null +++ b/src/pipeline/default.py @@ -0,0 +1,38 @@ +from src.pipeline.interface import Pipeline +from typing import Dict +from src.suppressors.interface import Suppressor +from src.detectors.interface import Detector +from src.replacers.interface import ReplacerInterface +from src.input_parsers.interface import InputParser + + +class DefaultPipeline(Pipeline): + def __init__( + self, + input_parser: InputParser, + detectors: Dict[str, Detector], + suppressor: Suppressor, + replacers: Dict[str, ReplacerInterface], + ): + self._input_parser = input_parser + self._detectors = detectors + self._suppressor = suppressor + self._replacers = replacers + + def run(self, input) -> str: + parsed_input = self._input_parser.parse(input) + + detected_entities = [] + for detector_name, detector in self._detectors.items(): + detected_entities += detector.detect(parsed_input[0], parsed_input[1]) + + annotaitons_cleaned = self._suppressor.suppress(detected_entities) + + replaced_input = parsed_input[0] + annotations_left = annotaitons_cleaned + for replacer_name, replacer in self._replacers.items(): + replaced_input, annotations_left = replacer.replace( + replaced_input, annotations_left + ) + + return replaced_input diff --git a/src/pipeline/interface.py b/src/pipeline/interface.py new file mode 100644 index 0000000..ebd8fc9 --- /dev/null +++ b/src/pipeline/interface.py @@ -0,0 +1,3 @@ +class Pipeline: + def run(self, input) -> str: + raise NotImplementedError \ No newline at end of file diff --git a/src/replacers/__init__.py b/src/replacers/__init__.py new file mode 100644 index 0000000..e652b43 --- /dev/null +++ b/src/replacers/__init__.py @@ -0,0 +1,2 @@ +from src.replacers.interface import ReplacerInterface +from src.replacers.tag_replacer import TagReplacer \ No newline at end of file diff --git a/src/replacers/date_replacer.py b/src/replacers/date_replacer.py new file mode 100644 index 0000000..7f2d681 --- /dev/null +++ b/src/replacers/date_replacer.py @@ -0,0 +1,88 @@ +from typing import List, Tuple +from src.annotations import ( + Annotation, + DateAnnotation, +) +from src.string_replacements import replace_and_update +from src.replacers.interface import ReplacerInterface +import random + +# TODO: Add support for other languages +months_map = { + 1: "stycznia", + 2: "lutego", + 3: "marca", + 4: "kwietnia", + 5: "maja", + 6: "czerwca", + 7: "lipca", + 8: "sierpnia", + 9: "wrzeÅ›nia", + 10: "października", + 11: "listopada", + 12: "grudnia", +} + +class DateReplacer(ReplacerInterface): + def __init__(self): + pass + + def replace(self, text: str, detections: List[Tuple[int, int, Annotation]]) -> Tuple[str, List[Tuple[int, int, Annotation]]]: + replacements = [] + not_processed = [] + + already_replaced = dict() + + for item in detections: + start, end, detection = item + + if isinstance(detection, DateAnnotation): + replacement = [] + if detection.format is not None: + format = detection.format + else: + format = [ + (DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"), + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"), + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2020"), + ] + + if text[start:end] in already_replaced: + replacement = already_replaced[text[start:end]] + else: + for entry in format: + if entry[0] == DateAnnotation.AnnotationPart.TWO_DIGITS_DAY: + random_day = random.randint(1, 28) + replacement.append(str(random_day).zfill(2)) + elif entry[0] == DateAnnotation.AnnotationPart.ONE_DIGIT_DAY: + random_day = random.randint(1, 28) + replacement.append(str(random_day)) + elif entry[0] == DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH: + random_month = random.randint(1, 12) + replacement.append(str(random_month).zfill(2)) + elif entry[0] == DateAnnotation.AnnotationPart.ONE_DIGIT_MONTH: + random_month = random.randint(1, 12) + replacement.append(str(random_month)) + elif entry[0] == DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR: + random_year = random.randint(1900, 2020) + replacement.append(str(random_year)) + elif entry[0] == DateAnnotation.AnnotationPart.TWO_DIGIT_YEAR: + random_year = random.randint(0, 99) + replacement.append(str(random_year).zfill(2)) + elif entry[0] == DateAnnotation.AnnotationPart.TEXT_MONTH: + random_month = random.randint(1, 12) + month_name = months_map[random_month] + replacement.append(month_name) + elif entry[0] == DateAnnotation.AnnotationPart.OTHER: + replacement.append(entry[1]) + + replacement = "".join(replacement) + already_replaced[text[start:end]] = replacement + + replacements.append((start, end, replacement)) + else: + not_processed.append(item) + + return replace_and_update(text, replacements, not_processed) \ No newline at end of file diff --git a/src/replacers/delete_replacer.py b/src/replacers/delete_replacer.py new file mode 100644 index 0000000..218873b --- /dev/null +++ b/src/replacers/delete_replacer.py @@ -0,0 +1,20 @@ +from typing import List, Tuple +from src.annotations import Annotation +from src.string_replacements import replace +from src.replacers.interface import ReplacerInterface + + +class DeleteReplacer(ReplacerInterface): + def __init__(self): + pass + + def replace( + self, text: str, detections: List[Tuple[int, int, Annotation]] + ) -> Tuple[str, List[Tuple[int, int, Annotation]]]: + + result = [ + (start, end, "") + for start, end, _ in detections + ] + + return replace(text, result), [] \ No newline at end of file diff --git a/src/replacers/email_replacer.py b/src/replacers/email_replacer.py new file mode 100644 index 0000000..48bf871 --- /dev/null +++ b/src/replacers/email_replacer.py @@ -0,0 +1,38 @@ +from typing import List, Tuple +from src.annotations import ( + Annotation, + EmailAnnotation, +) +from src.string_replacements import replace_and_update +from src.replacers.interface import ReplacerInterface +import random +import string + +def random_char(char_num): + return ''.join(random.choice(string.ascii_letters) for _ in range(char_num)) + +def random_email(): + return random_char(7)+"@gmail.com" + +class EmailReplacer(ReplacerInterface): + def __init__(self): + pass + + def replace(self, text: str, detections: List[Tuple[int, int, Annotation]]) -> Tuple[str, List[Tuple[int, int, Annotation]]]: + replacements = [] + not_processed = [] + + already_replaced = dict() + + for item in detections: + start, end, detection = item + + if isinstance(detection, EmailAnnotation): + if text[start:end] not in already_replaced: + already_replaced[text[start:end]] = random_email() + + replacements.append((start, end, already_replaced[text[start:end]])) + else: + not_processed.append(item) + + return replace_and_update(text, replacements, not_processed) \ No newline at end of file diff --git a/src/replacers/interface.py b/src/replacers/interface.py new file mode 100644 index 0000000..fcaa21e --- /dev/null +++ b/src/replacers/interface.py @@ -0,0 +1,22 @@ +from abc import ABC, abstractmethod +from typing import List, Tuple +from src.annotations import Annotation + + +class ReplacerInterface(ABC): + @abstractmethod + def replace( + self, text: str, detections: List[Tuple[int, int, Annotation]] + ) -> Tuple[str, List[Tuple[int, int, Annotation]]]: + """Replace detected entities in text with anonimized version. + + Args: + text (str): Text to be processed. + detections (List[Tuple[int, int, str]]): List of detections. + + Returns: + Tuple[str, List[Tuple[int, int, str]]]: Text with supported entities + replaced with anonimized version and list of detections that were + not processed by this replacer. + """ + pass diff --git a/src/replacers/ner_replacer.py b/src/replacers/ner_replacer.py new file mode 100644 index 0000000..edb10b6 --- /dev/null +++ b/src/replacers/ner_replacer.py @@ -0,0 +1,36 @@ +from typing import List, Tuple +from src.annotations import ( + Annotation, +) +from src.string_replacements import replace_and_update +from src.replacers.interface import ReplacerInterface +from src.dictionaries.morphosyntactic import MorphosyntacticDictionary + + +class NERReplacer(ReplacerInterface): + def __init__(self, dictionary: MorphosyntacticDictionary): + self._dictionary = dictionary + + def replace( + self, text: str, detections: List[Tuple[int, int, Annotation]] + ) -> Tuple[str, List[Tuple[int, int, Annotation]]]: + replacements = [] + not_processed = [] + + already_replaced = dict() + + for item in detections: + start, end, detection = item + + key = (text[start:end], type(detection)) + + if key not in already_replaced: + replacement = self._dictionary.get_random_replacement(detection) + already_replaced[key] = replacement + + if already_replaced[key] is None: + not_processed.append(item) + else: + replacements.append((start, end, already_replaced[key])) + + return replace_and_update(text, replacements, not_processed) diff --git a/src/replacers/tag_replacer.py b/src/replacers/tag_replacer.py new file mode 100644 index 0000000..366934d --- /dev/null +++ b/src/replacers/tag_replacer.py @@ -0,0 +1,47 @@ +from typing import List, Tuple +from src.annotations import ( + Annotation, + NameAnnotation, + SurnameAnnotation, + StreetNameAnnotation, + CityAnnotation, + CountryAnnotation, + PhoneNumberAnnotation, + UrlAnnotation, + UserAnnotation, + EmailAnnotation, + DateAnnotation, + TINAnnotation, + KRSAnnotation, +) +from src.string_replacements import replace +from src.replacers.interface import ReplacerInterface + + +class TagReplacer(ReplacerInterface): + def __init__(self): + self.tags_map = { + NameAnnotation: "[OSOBA]", + SurnameAnnotation: "[OSOBA]", + StreetNameAnnotation: "[MIEJSCE]", + CityAnnotation: "[MIEJSCE]", + CountryAnnotation: "[MIEJSCE]", + PhoneNumberAnnotation: "[DIGITS]", + UrlAnnotation: "[WWW]", + UserAnnotation: "@[USER]", + EmailAnnotation: "[MAIL]", + DateAnnotation: "[DATE]", + TINAnnotation: "[DIGITS]", + KRSAnnotation: "[DIGITS]", + } + + def replace( + self, text: str, detections: List[Tuple[int, int, Annotation]] + ) -> Tuple[str, List[Tuple[int, int, Annotation]]]: + + result = [ + (start, end, self.tags_map.get(type(entity_type), "[OTHER]")) + for start, end, entity_type in detections + ] + + return replace(text, result), [] \ No newline at end of file diff --git a/src/replacers/user_replacer.py b/src/replacers/user_replacer.py new file mode 100644 index 0000000..66aeaf4 --- /dev/null +++ b/src/replacers/user_replacer.py @@ -0,0 +1,32 @@ +from typing import List, Tuple +from src.annotations import ( + Annotation, + UserAnnotation, +) +from src.string_replacements import replace_and_update +from src.replacers.interface import ReplacerInterface +from random_username.generate import generate_username + +class UserReplacer(ReplacerInterface): + def __init__(self): + pass + + def replace(self, text: str, detections: List[Tuple[int, int, Annotation]]) -> Tuple[str, List[Tuple[int, int, Annotation]]]: + replacements = [] + not_processed = [] + + already_replaced = dict() + + for item in detections: + start, end, detection = item + + if isinstance(detection, UserAnnotation): + if text[start:end] not in already_replaced: + username = "@" + generate_username(1)[0] + already_replaced[text[start:end]] = username + + replacements.append((start, end, already_replaced[text[start:end]])) + else: + not_processed.append(item) + + return replace_and_update(text, replacements, not_processed) \ No newline at end of file diff --git a/src/string_replacements.py b/src/string_replacements.py index 33c426d..528b5b5 100644 --- a/src/string_replacements.py +++ b/src/string_replacements.py @@ -1,27 +1,88 @@ -from typing import List, Tuple +from typing import List, Tuple, Any, TypeVar -def replace(original_string: str, replacements: List[Tuple[int, int, str]]): + +def replace(original_string: str, replacements: List[Tuple[int, int, str]]) -> str: """ Replaces substrings in a string. + !!! Important: This function assumes that there are no overlapping annotations. + Parameters ---------- original_string : str The original string. replacements : List[Tuple[int, int, str]] A list of tuples containing (start, end, replacement). - + Returns ------- str The string with replacements applied. """ - + replacements = sorted(replacements, key=lambda x: x[0]) - + delta = 0 for replacement in replacements: - original_string = original_string[:replacement[0] + delta] + replacement[2] + original_string[replacement[1] + delta:] + original_string = ( + original_string[: replacement[0] + delta] + + replacement[2] + + original_string[replacement[1] + delta :] + ) delta += len(replacement[2]) - (replacement[1] - replacement[0]) + + return original_string + + +_T = TypeVar("_T") + + +def replace_and_update( + original_string: str, + replacements: List[Tuple[int, int, str]], + other_annotations: List[Tuple[int, int, _T]], +) -> Tuple[str, List[Tuple[int, int, _T]]]: + """ Replaces substrings in a string and updates other annotations to match new string. + + !!! Important: This function assumes that there are no overlapping annotations. + + Parameters + ---------- + original_string : str + The original string. + replacements : List[Tuple[int, int, str]] + A list of tuples containing (start, end, replacement). + other_annotations : List[Tuple[int, int, Any]] + A list of other annotations. + + Returns + ------- + Tuple[str, List[Tuple[int, int, Any]]] + The string with replacements applied and other annotations with new positions. + """ + + joined_list = [] + for replacement in replacements: + joined_list.append((replacement[0], replacement[1], replacement[2], True)) + for other_annotation in other_annotations: + joined_list.append((other_annotation[0], other_annotation[1], other_annotation[2], False)) + + annotations = sorted(joined_list, key=lambda x: x[0]) + + new_other_annotations = [] + + delta = 0 + for annotation in annotations: + is_replacement = annotation[3] - return original_string \ No newline at end of file + if is_replacement: + original_string = ( + original_string[: annotation[0] + delta] + + annotation[2] + + original_string[annotation[1] + delta :] + ) + delta += len(annotation[2]) - (annotation[1] - annotation[0]) + else: + new_other_annotations.append((annotation[0] + delta, annotation[1] + delta, annotation[2])) + + return original_string, new_other_annotations \ No newline at end of file diff --git a/src/suppressors/interface.py b/src/suppressors/interface.py new file mode 100644 index 0000000..7fbf543 --- /dev/null +++ b/src/suppressors/interface.py @@ -0,0 +1,15 @@ +from typing import List, Tuple, Any + +class Suppressor: + def suppress(self, annotations: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]: + """Suppresses annotations on overlappment. + + Args: + annotations (List[Tuple[int, int, Any]]): List of annotations. + + Returns: + List[Tuple[int, int, Any]]: List of annotations with overlapping + annotations removed. + + """ + raise NotImplementedError \ No newline at end of file diff --git a/src/suppressors/order_based.py b/src/suppressors/order_based.py index 8488465..2be5569 100644 --- a/src/suppressors/order_based.py +++ b/src/suppressors/order_based.py @@ -1,17 +1,28 @@ -from typing import List, Tuple, Dict +from typing import List, Tuple, Dict, Any from bitarray import bitarray +from src.suppressors.interface import Suppressor -def suppress_order_based(annotations: List[Tuple[int, int, str]]) -> List[Tuple[int, int, str]]: +class OrderBasedSuppressor(Suppressor): + def __init__(self) -> None: + super().__init__() + + def suppress(self, annotations: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]: + return suppress_order_based(annotations) + +def suppress_order_based(annotations: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]: """If two annotations overlap, the first one int the list is kept. Args: - annotations (List[Tuple[int, int, str]]): List of annotations. + annotations (List[Tuple[int, int, Any]]): List of annotations. Returns: - List[Tuple[int, int, str]]: List of annotations with overlapping + List[Tuple[int, int, Any]]: List of annotations with overlapping annotations removed. """ + if len(annotations) == 0: + return annotations + annotations = annotations bitarray_size = max([end for _, end, _ in annotations]) bitarray_ = bitarray(bitarray_size) diff --git a/src/tag_anonimization.py b/src/tag_anonimization.py deleted file mode 100644 index 89e1a10..0000000 --- a/src/tag_anonimization.py +++ /dev/null @@ -1,40 +0,0 @@ -from typing import List, Tuple -from collections import defaultdict -from src.entity_types import EntityTypes -from src.string_replacements import replace - -def replace_with_tags(text: str, detections: List[Tuple[int, int, str]]) -> str: - """Replace entities with tags. - - Args: - text (str): Text to be processed. - detections (List[Tuple[int, int, str]]): List of detections. - - Returns: - str: Text with entities replaced with tags. - - """ - - tags_map = { - EntityTypes.NAME: "[OSOBA]", - EntityTypes.SURNAME: "[OSOBA]", - EntityTypes.STREET_NAME: "[MIEJSCE]", - EntityTypes.CITY: "[MIEJSCE]", - EntityTypes.COUNTRY: "[MIEJSCE]", - EntityTypes.PHONE_NUMBER: "[DIGITS]", - EntityTypes.URL: "[WWW]", - EntityTypes.USER: "@[USER]", - EntityTypes.EMAIL: "[MAIL]", - EntityTypes.DATE: "[DATE]", - EntityTypes.TIN: "[DIGITS]", - EntityTypes.KRS: "[DIGITS]", - } - - result = [ - (start, end, tags_map.get(entity_type, "[OTHER]")) - for start, end, entity_type in detections - ] - - return replace(text, result) - - \ No newline at end of file diff --git a/src/utils/ner_pl_n5_mapping.py b/src/utils/ner_pl_n5_mapping.py deleted file mode 100644 index 0b857b5..0000000 --- a/src/utils/ner_pl_n5_mapping.py +++ /dev/null @@ -1,9 +0,0 @@ -from src.entity_types import EntityTypes - -NER_PL_N5_MAPPING = { - "nam_liv_person": EntityTypes.NAME, - "nam_liv_person_last": EntityTypes.SURNAME, - "nam_fac_road": EntityTypes.STREET_NAME, - "nam_loc_gpe_city": EntityTypes.CITY, - "nam_org_group_team": EntityTypes.COUNTRY, -} \ No newline at end of file diff --git a/src/worker.py b/src/worker.py index 6cbc166..d7b27d7 100644 --- a/src/worker.py +++ b/src/worker.py @@ -2,17 +2,35 @@ import logging import nlp_ws - - -from src.anonymizers.polish_anonymizer import PolishAnonymizer -from src.anonymizers.english_anonymizer import EnglishAnonymizer -from src.anonymizers.russian_anonymizer import RussianAnonymizer +from hydra import initialize, compose +from hydra.utils import instantiate _log = logging.getLogger(__name__) class Worker(nlp_ws.NLPWorker): """Implements nlp_worker for anonymizer service.""" + def __init__(self) -> None: + self._last_config = None + self._pipeline = None + super().__init__() + + def _prepare_pipeline(self, task_options): + language = task_options.get('language', 'pl') + replace_method = task_options.get('method', 'tag') + + overrides = [ + "language=" + language, + "replacers=" + replace_method, + ] + + config_hash = hash(tuple(overrides)) + if self._last_config != config_hash: + with initialize(config_path="./config"): + cfg = compose(config_name="config", overrides=overrides) + self._pipeline = instantiate(cfg["pipeline"]) + + return self._pipeline def process(self, input_file, task_options, output_file): """Anonymizes input text. @@ -23,11 +41,10 @@ class Worker(nlp_ws.NLPWorker): method - 'delete'/'tag'/'pseudo' - 'delete' deletes selected tokens, 'tag' replaces selected tokens with arbitrary tags, 'pseudo' replaces selected tokens with a random token that + language - 'pl' - language of the input text. As of now only Polish is supported. """ - lang = task_options.get('language', 'pl') - anonymizers = {'pl': PolishAnonymizer, - 'en': EnglishAnonymizer, - 'ru': RussianAnonymizer - } - anon = anonymizers.get(lang, PolishAnonymizer)(task_options) - anon.process(input_file, output_file) + pipeline = self._prepare_pipeline(task_options) + + with open(output_file, 'w', encoding='utf-8') as f: + result = pipeline.run(input_file) + f.write(result) diff --git a/tests/detectors/date/test_en.py b/tests/detectors/date/test_en.py index 8104a83..704ce37 100644 --- a/tests/detectors/date/test_en.py +++ b/tests/detectors/date/test_en.py @@ -1,17 +1,34 @@ -from src.detectors.date.en import detect_dates_en -from src.entity_types import EntityTypes - +from src.annotations import DateAnnotation +from src.detectors.date.date import DateDetector def test_detect_dates_en(): + detector = DateDetector("en") + # Check en-us text = "On 1.01.2022, I sold my cat. On April 5, 2021, I bought a dog." - found_dates = detect_dates_en(text) + found_dates = detector.detect(text, dict()) + + format_date1 = [ + (DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"), # Only supports two digits for now + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"), + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2022") + ] + + format_date2 = [ + (DateAnnotation.AnnotationPart.TEXT_MONTH, "April"), + (DateAnnotation.AnnotationPart.OTHER, " "), + (DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "05"), # Only supports two digits for now + (DateAnnotation.AnnotationPart.OTHER, ", "), + (DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2021"), + ] - assert found_dates == [(3, 12, EntityTypes.DATE), (32, 45, EntityTypes.DATE)] + assert found_dates == [(3, 12, DateAnnotation(format_date1)), (32, 45, DateAnnotation(format_date2))] # Check en-gb # TODO: Following test fails. Fix it. # text = "On 1.01.2022 I sold the cat. On 5th April 2021 I bought a dog." # found_dates = detect_dates_en(text) - # assert found_dates == [(3,12, EntityTypes.DATE), (32,46, EntityTypes.DATE)] + # assert found_dates == [(3,12, DateAnnotation()), (32,46, DateAnnotation())] diff --git a/tests/detectors/date/test_pl.py b/tests/detectors/date/test_pl.py index 2942163..077240d 100644 --- a/tests/detectors/date/test_pl.py +++ b/tests/detectors/date/test_pl.py @@ -1,9 +1,26 @@ -from src.detectors.date.pl import detect_dates_pl -from src.entity_types import EntityTypes - +from src.annotations import DateAnnotation +from src.detectors.date.date import DateDetector def test_detect_dates_pl(): + detector = DateDetector("pl") + text = "W dniu 1.01.2022 sprzedaÅ‚em kota. 5 kwietnia 2021 roku kupiÅ‚em psa." - found_dates = detect_dates_pl(text) + found_dates = detector.detect(text, dict()) - assert found_dates == [(7, 16, EntityTypes.DATE), (34, 49, EntityTypes.DATE)] + format_date1 = [ + (DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"), # Only supports two digits for now + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"), + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2022") + ] + + format_date2 = [ + (DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "05"), # Only supports two digits for now + (DateAnnotation.AnnotationPart.OTHER, " "), + (DateAnnotation.AnnotationPart.TEXT_MONTH, "kwietnia"), + (DateAnnotation.AnnotationPart.OTHER, " "), + (DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2021"), + ] + + assert found_dates == [(7, 16, DateAnnotation(format_date1)), (34, 49, DateAnnotation(format_date2))] \ No newline at end of file diff --git a/tests/detectors/date/test_ru.py b/tests/detectors/date/test_ru.py index 5b90d29..5269b94 100644 --- a/tests/detectors/date/test_ru.py +++ b/tests/detectors/date/test_ru.py @@ -1,9 +1,27 @@ -from src.detectors.date.ru import detect_dates_ru -from src.entity_types import EntityTypes +from src.annotations import DateAnnotation +from src.detectors.date.date import DateDetector def test_detect_dates_pl(): + detector = DateDetector("ru") + text = "1.01.2022 Ñ Ð¿Ñ€Ð¾Ð´Ð°Ð» кошку. 5 Ð°Ð¿Ñ€ÐµÐ»Ñ 2021 Ñ ÐºÑƒÐ¿Ð¸Ð» Ñобаку." - found_dates = detect_dates_ru(text) - - assert found_dates == [(0, 9, EntityTypes.DATE), (26, 39, EntityTypes.DATE)] + found_dates = detector.detect(text, dict()) + + format_date1 = [ + (DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"), # Only supports two digits for now + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"), + (DateAnnotation.AnnotationPart.OTHER, "."), + (DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2022") + ] + + format_date2 = [ + (DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "05"), + (DateAnnotation.AnnotationPart.OTHER, " "), + (DateAnnotation.AnnotationPart.TEXT_MONTH, "апрелÑ"), # Only supports two digits for now + (DateAnnotation.AnnotationPart.OTHER, " "), + (DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2021"), + ] + + assert found_dates == [(0, 9, DateAnnotation(format_date1)), (26, 39, DateAnnotation(format_date2))] diff --git a/tests/detectors/email/test_email.py b/tests/detectors/email/test_email.py index 6be224f..95ecb06 100644 --- a/tests/detectors/email/test_email.py +++ b/tests/detectors/email/test_email.py @@ -1,8 +1,10 @@ -from src.detectors.email import detect_emails -from src.entity_types import EntityTypes +from src.annotations import EmailAnnotation +from src.detectors.email import EmailDetector def test_detect_emails(): + detector = EmailDetector() + text = "My email is arkadiusz@borek.pw. My friend's email is arkadiusz.dump@pwr.edu.pl" - found_emails = detect_emails(text, "en") + found_emails = detector.detect(text, dict()) - assert found_emails == [(12, 30, EntityTypes.EMAIL), (53, 78, EntityTypes.EMAIL)] \ No newline at end of file + assert found_emails == [(12, 30, EmailAnnotation()), (53, 78, EmailAnnotation())] \ No newline at end of file diff --git a/tests/detectors/ner/test_pl_liner_n5.py b/tests/detectors/ner/test_pl_liner_n5.py index ab14e41..544dc00 100644 --- a/tests/detectors/ner/test_pl_liner_n5.py +++ b/tests/detectors/ner/test_pl_liner_n5.py @@ -1,21 +1,23 @@ -from src.detectors.ner.pl_liner_n5 import detect_ner_pl_liner_n5 -from src.entity_types import EntityTypes +from src.annotations import NameAnnotation, SurnameAnnotation, CityAnnotation +from src.detectors.ner import NerDetector def test_detect_names_pl_liner_n5(): + detector = NerDetector("pl") + ccl_annotations = { - 'nam_liv_person': [(10, 16, 'Marian'), (100, 109, 'Magdalena')], - 'nam_liv_person_last': [(30, 35, 'Nowak')], - 'nam_loc_gpe_city': [(50, 59, 'WrocÅ‚awiu')], + 'person_first_nam': [(10, 16, 'Marian'), (100, 109, 'Magdalena')], + 'person_last_nam': [(30, 35, 'Nowak')], + 'city_nam': [(50, 59, 'WrocÅ‚awiu')], 'some_other_annotation': [(120, 124, 'zowd')], } - result = detect_ner_pl_liner_n5(ccl_annotations) + result = detector.detect("", ccl_annotations) expected = [ - (10, 16, EntityTypes.NAME), - (100, 109, EntityTypes.NAME), - (30, 35, EntityTypes.SURNAME), - (50, 59, EntityTypes.CITY), + (10, 16, NameAnnotation()), + (100, 109, NameAnnotation()), + (30, 35, SurnameAnnotation()), + (50, 59, CityAnnotation()), ] assert set(result) == set(expected) \ No newline at end of file diff --git a/tests/detectors/phone/test_phone.py b/tests/detectors/phone/test_phone.py index 733f263..ad3bc59 100644 --- a/tests/detectors/phone/test_phone.py +++ b/tests/detectors/phone/test_phone.py @@ -1,8 +1,10 @@ -from src.detectors.phone.phone import detect_phone_numbers -from src.entity_types import EntityTypes +from src.annotations import PhoneNumberAnnotation +from src.detectors.phone import PhoneNumberDetector def test_detect_phone_numbers(): + detector = PhoneNumberDetector() + text = "My phone number is +48 123 456 789. My friend's number is 123456789." - found_phone_numbers = detect_phone_numbers(text, "en") + found_phone_numbers = detector.detect(text, dict()) - assert found_phone_numbers == [(19, 34, EntityTypes.PHONE_NUMBER), (58, 67, EntityTypes.PHONE_NUMBER)] \ No newline at end of file + assert found_phone_numbers == [(19, 34, PhoneNumberAnnotation()), (58, 67, PhoneNumberAnnotation())] \ No newline at end of file diff --git a/tests/detectors/url/test_url.py b/tests/detectors/url/test_url.py index 3d50e4d..44d14ff 100644 --- a/tests/detectors/url/test_url.py +++ b/tests/detectors/url/test_url.py @@ -1,17 +1,22 @@ -from src.detectors.url import detect_urls -from src.entity_types import EntityTypes +from src.detectors.url import UrlDetector +from src.annotations import UrlAnnotation def test_detect_urls(): + detector = UrlDetector("en") + text = "This is a test for www.google.com. Make sure to go to https://www.google.com" - found_urls = detect_urls(text, "en") + found_urls = detector.detect(text, dict()) - assert found_urls == [(19, 33, EntityTypes.URL), (54, 76, EntityTypes.URL)] + assert found_urls == [(19, 33, UrlAnnotation()), (54, 76, UrlAnnotation())] def test_detect_urls_pl(): + detector_en = UrlDetector("en") + detector_pl = UrlDetector("pl") + text = "m.in. https://www.google.com" - found_urls_pl = detect_urls(text, "pl") - found_urls_en = detect_urls(text, "en") + found_urls_pl = detector_pl.detect(text, dict()) + found_urls_en = detector_en.detect(text, dict()) # m.in is a valid shortcut for miÄ™dzy innymi in Polish. It should not be detected as a URL. - assert found_urls_pl == [(6, 28, EntityTypes.URL)] - assert found_urls_en == [(0, 4, EntityTypes.URL), (6, 28, EntityTypes.URL)] \ No newline at end of file + assert found_urls_pl == [(6, 28, UrlAnnotation())] + assert found_urls_en == [(0, 4, UrlAnnotation()), (6, 28, UrlAnnotation())] \ No newline at end of file diff --git a/tests/detectors/user/test_user.py b/tests/detectors/user/test_user.py index 0ae3c9e..028b1f4 100644 --- a/tests/detectors/user/test_user.py +++ b/tests/detectors/user/test_user.py @@ -1,8 +1,10 @@ -from src.detectors.user.user import detect_users -from src.entity_types import EntityTypes +from src.detectors.user import UserDetector +from src.annotations import UserAnnotation def test_detect_users(): + detector = UserDetector() + text = "My username is @john_smith. My friend's username is @jane_doe." - found_users = detect_users(text, "en") + found_users = detector.detect(text, dict()) - assert found_users == [(15, 26, EntityTypes.USER), (52, 61, EntityTypes.USER)] \ No newline at end of file + assert found_users == [(15, 26, UserAnnotation()), (52, 61, UserAnnotation())] \ No newline at end of file diff --git a/tests/dictionaries/morphosyntactic/__init__.py b/tests/dictionaries/morphosyntactic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/dictionaries/morphosyntactic/test_pl_ner.py b/tests/dictionaries/morphosyntactic/test_pl_ner.py new file mode 100644 index 0000000..7d9e229 --- /dev/null +++ b/tests/dictionaries/morphosyntactic/test_pl_ner.py @@ -0,0 +1,22 @@ +from src.dictionaries.morphosyntactic.pl_ner import PlNERMorphosyntacticDictionary +from src.annotations import NameAnnotation, CityAnnotation, SurnameAnnotation + +def test_pl_ner_morphosyntactic_dictionary(): + dictionary = PlNERMorphosyntacticDictionary(list=[ + (NameAnnotation, "Andrzejowi", "Andrzej", "subst:sg:dat:m1"), + (NameAnnotation, "Andrzej", "Andrzej", "subst:sg:m1:imperf"), + (NameAnnotation, "Kasia", "Kasia", "subst:sg:f:imperf"), + (CityAnnotation, "WrocÅ‚aw", "WrocÅ‚aw", "subst:sg:m2:imperf"), + (CityAnnotation, "Warszawa", "Warszawa", "subst:sg:f:imperf"), + (CityAnnotation, "Kraków", "Kraków", "subst:sg:m2:imperf") + ]) + + example_name_1 = NameAnnotation(morpho_tag="subst:sg:dat:m1") + example_name_2 = NameAnnotation(morpho_tag="subst:sg:m1:imperf") + example_other = SurnameAnnotation(morpho_tag="subst:sg:m1:imperf") + + assert dictionary.get_random_replacement(example_name_1) == "Andrzejowi" + assert dictionary.get_random_replacement(example_name_2) in ["Andrzej", "Kasia"] + + # I no good replacement is present, it should choose a random one + assert dictionary.get_random_replacement(example_other) in ["Andrzejowi" ,"Andrzej", "Kasia", "WrocÅ‚aw", "Warszawa", "Kraków"] \ No newline at end of file diff --git a/tests/dictionaries/test_pl_ner_replacements.py b/tests/dictionaries/test_pl_ner_replacements.py deleted file mode 100644 index a694d2e..0000000 --- a/tests/dictionaries/test_pl_ner_replacements.py +++ /dev/null @@ -1,38 +0,0 @@ -from src.dictionaries.pl_ner_replacements import load_pl_ner_replacements_dictionary -from tempfile import NamedTemporaryFile - -def test_load_pl_ner_replacements_dictionary(): - with NamedTemporaryFile(mode="w", encoding="utf-8", delete=False) as file: - file.write("OSOBA\tAndrzejowi\tAndrzej\tsubst:sg:dat:m1\n") - file.write("OSOBA\tAndrzej\tAndrzej\tsubst:sg:m1:imperf\n") - file.write("OSOBA\tKasia\tKasia\tsubst:sg:f:imperf\n") - file.write("MIEJSCE\tWrocÅ‚aw\tWrocÅ‚aw\tsubst:sg:m2:imperf\n") - file.write("MIEJSCE\tWarszawa\tWarszawa\tsubst:sg:f:imperf\n") - file.write("MIEJSCE\tKraków\tKraków\tsubst:sg:m2:imperf\n") - - path = file.name - - dictionary = load_pl_ner_replacements_dictionary(path) - - assert dictionary == { - "OSOBA": { - "Andrzej": { - "subst:sg:dat:m1": "Andrzejowi", - "subst:sg:m1:imperf": "Andrzej" - }, - "Kasia": { - "subst:sg:f:imperf": "Kasia" - } - }, - "MIEJSCE": { - "WrocÅ‚aw": { - "subst:sg:m2:imperf": "WrocÅ‚aw" - }, - "Warszawa": { - "subst:sg:f:imperf": "Warszawa" - }, - "Kraków": { - "subst:sg:m2:imperf": "Kraków" - } - } - } \ No newline at end of file diff --git a/tests/input_parsers/__init__.py b/tests/input_parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_ccl_parser.py b/tests/input_parsers/test_ccl.py similarity index 67% rename from tests/test_ccl_parser.py rename to tests/input_parsers/test_ccl.py index e140edc..ec78647 100644 --- a/tests/test_ccl_parser.py +++ b/tests/input_parsers/test_ccl.py @@ -1,4 +1,6 @@ -from src.ccl_parser import parse_ccl +# from src.annotation_types_old import AnnotationTypes +from src.input_parsers.ccl import CCLInputParser +from tempfile import NamedTemporaryFile example_ccl = """<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE chunkList SYSTEM "ccl.dtd"> @@ -48,13 +50,26 @@ example_ccl = """<?xml version="1.0" encoding="UTF-8"?> </chunkList> """ -def test_parse_ccl(): - text, annotations = parse_ccl(example_ccl) + +def test_ccl_input_parser(): + parser = CCLInputParser() + with NamedTemporaryFile() as f: + f.write(example_ccl.encode("utf-8")) + f.flush() + text, annotations = parser.parse(f.name) + assert text == "Marek Kowalski pojechaÅ‚ do WrocÅ‚awia." - - assert set(annotations.keys()) == set(["nam_liv", "nam_loc", "ctag"]) - + + # assert set(annotations.keys()) == set(["nam_liv", "nam_loc", AnnotationTypes.MORPHOSYNTACTIC_TAG]) + assert annotations["nam_liv"] == [(0, 14, "Marek Kowalski")] assert annotations["nam_loc"] == [(27, 36, "WrocÅ‚awia")] - assert annotations["ctag"] == [(0, 5, "subst:sg:nom:m1"), (6, 14, "subst:sg:nom:m1"), (15, 23, "praet:sg:m1:perf"), (24, 26, "prep:gen"), (27, 36, "subst:sg:gen:m3"), (36, 37, "interp")] \ No newline at end of file + # assert annotations[AnnotationTypes.MORPHOSYNTACTIC_TAG] == [ + # (0, 5, "subst:sg:nom:m1"), + # (6, 14, "subst:sg:nom:m1"), + # (15, 23, "praet:sg:m1:perf"), + # (24, 26, "prep:gen"), + # (27, 36, "subst:sg:gen:m3"), + # (36, 37, "interp"), + # ] diff --git a/tests/pipeline/__init__.py b/tests/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/pipeline/test_default.py b/tests/pipeline/test_default.py new file mode 100644 index 0000000..97acb46 --- /dev/null +++ b/tests/pipeline/test_default.py @@ -0,0 +1,33 @@ +from src.pipeline.default import DefaultPipeline +from src.annotations import NameAnnotation +from src.input_parsers.interface import InputParser +from src.detectors.interface import Detector +from src.suppressors.interface import Suppressor +from src.replacers.interface import ReplacerInterface + +class MockInputParser(InputParser): + def parse(self, input): + return "ala ma kota", {} + +class MockDetector(Detector): + def detect(self, text, annotations): + return [(0, 3, NameAnnotation())] + +class MockSuppressor(Suppressor): + def suppress(self, annotations): + return annotations + +class MockReplacer(ReplacerInterface): + def replace(self, text, annotations): + return "zbigniew ma kota", annotations + +def test_default_pipeline(): + # TODO: Prepare mocks that will better test the pipeline + pipeline = DefaultPipeline( + MockInputParser(), + {"mock_detector": MockDetector()}, + MockSuppressor(), + {"mock_replacer": MockReplacer()} + ) + + assert pipeline.run("/test.txt") == "zbigniew ma kota" \ No newline at end of file diff --git a/tests/replacers/__init__.py b/tests/replacers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/replacers/test_date_replacer.py b/tests/replacers/test_date_replacer.py new file mode 100644 index 0000000..77ce093 --- /dev/null +++ b/tests/replacers/test_date_replacer.py @@ -0,0 +1,43 @@ + +from src.replacers.date_replacer import DateReplacer +from src.annotations import NameAnnotation, SurnameAnnotation, DateAnnotation, CityAnnotation + +def test_date_replacer(): + text = "Ala Brzeszczot urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + detections = [ + (0, 3, NameAnnotation()), + (4, 14, SurnameAnnotation()), + (28, 38, DateAnnotation()), + (42, 51, CityAnnotation()), + ] + + replacer = DateReplacer() + + result = replacer.replace(text, detections) + + expected_text_beggining = "Ala Brzeszczot urodziÅ‚a sie " + expected_text_ending = " we WrocÅ‚awiu" + exptected_detections_left = [ + (0, 3, NameAnnotation()), + (4, 14, SurnameAnnotation()), + (len(result[0]) - 9, len(result[0]), CityAnnotation()), + ] + + assert result[0].startswith(expected_text_beggining) + assert result[0].endswith(expected_text_ending) + assert result[1] == exptected_detections_left + +def test_date_replacer_same_date_same_replacement(): + text = "Ala Brzeszczot urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu. 05.05.2005 to jej urodziny. 06.05.2005 to nie jej urodziny." + detections = [ + (28, 38, DateAnnotation()), + (53, 63, DateAnnotation()), + (81, 91, DateAnnotation()), + ] + + replacer = DateReplacer() + + result = replacer.replace(text, detections) + + assert result[0][29:39] == result[0][54:64] + assert result[1] == [] \ No newline at end of file diff --git a/tests/replacers/test_email_replacer.py b/tests/replacers/test_email_replacer.py new file mode 100644 index 0000000..a354f3e --- /dev/null +++ b/tests/replacers/test_email_replacer.py @@ -0,0 +1,43 @@ + +from src.replacers.email_replacer import EmailReplacer +from src.annotations import DateAnnotation, CityAnnotation, UserAnnotation, EmailAnnotation + +def test_email_replacer(): + text = "zz@z.pl urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + detections = [ + (0, 7, EmailAnnotation()), + (21, 31, DateAnnotation()), + (35, 44, CityAnnotation()), + ] + + replacer = EmailReplacer() + result = replacer.replace(text, detections) + + expected_text_ending = " urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + exptected_detections_left = [ + (len(result[0]) - 23, len(result[0]) - 13, DateAnnotation()), + (len(result[0]) - 9, len(result[0]), CityAnnotation()), + ] + + assert result[0].endswith(expected_text_ending) + assert result[0][0:-len(expected_text_ending)] != "zz@z.pl" + assert result[1] == exptected_detections_left + +def test_email_replacer_same_email_same_replacement(): + text = "zz@z.pl zz@z.pl aa@a.pl" + detections = [ + (0, 7, EmailAnnotation()), + (8, 15, EmailAnnotation()), + (16, 22, EmailAnnotation()), + + ] + + replacer = EmailReplacer() + result = replacer.replace(text, detections) + + old_emails = text.split() + new_emails = result[0].split() + + assert old_emails[0] != new_emails[0] + assert new_emails[0] == new_emails[1] + assert result[1] == [] \ No newline at end of file diff --git a/tests/replacers/test_ner_replacer.py b/tests/replacers/test_ner_replacer.py new file mode 100644 index 0000000..fad3921 --- /dev/null +++ b/tests/replacers/test_ner_replacer.py @@ -0,0 +1,32 @@ + +from src.replacers.ner_replacer import NERReplacer +from src.annotations import NameAnnotation, SurnameAnnotation, DateAnnotation, CityAnnotation +from src.dictionaries.morphosyntactic.pl_ner import PlNERMorphosyntacticDictionary + +def test_ner_replacer(): + dictionary = PlNERMorphosyntacticDictionary(list=[ + (NameAnnotation, "Andrzej", "Andrzej", "a"), + (NameAnnotation, "Kasi", "Kasia", "b"), + (SurnameAnnotation, "Kowalowi", "Kowal", "a"), + (SurnameAnnotation, "Kowal", "Kowal", "b"), + ], always_replace=False) + + text = "Ala Brzeszczot urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + detections = [ + (0, 3, NameAnnotation(morpho_tag="a")), + (4, 14, SurnameAnnotation(morpho_tag="b")), + (28, 38, DateAnnotation()), + (42, 51, CityAnnotation(morpho_tag="c")), + ] + + replacer = NERReplacer(dictionary) + + result = replacer.replace(text, detections) + + expected_text = "Andrzej Kowal urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + exptected_detections_left = [ + (27, 37, DateAnnotation()), + (41, 50, CityAnnotation(morpho_tag="c")), + ] + + assert result == (expected_text, exptected_detections_left) \ No newline at end of file diff --git a/tests/replacers/test_tag_replacer.py b/tests/replacers/test_tag_replacer.py new file mode 100644 index 0000000..cd73090 --- /dev/null +++ b/tests/replacers/test_tag_replacer.py @@ -0,0 +1,21 @@ + +from src.replacers.tag_replacer import TagReplacer +from src.annotations import NameAnnotation, SurnameAnnotation, DateAnnotation, CityAnnotation + +def test_replace_with_tags(): + text = "Ala Brzeszczot urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + detections = [ + (0, 3, NameAnnotation()), + (4, 14, SurnameAnnotation()), + (28, 38, DateAnnotation()), + (42, 51, CityAnnotation()), + ] + + replacer = TagReplacer() + + result = replacer.replace(text, detections) + + expected_text = "[OSOBA] [OSOBA] urodziÅ‚a sie [DATE] we [MIEJSCE]" + exptected_detections_left = [] + + assert result == (expected_text, exptected_detections_left) \ No newline at end of file diff --git a/tests/replacers/test_user_replacer.py b/tests/replacers/test_user_replacer.py new file mode 100644 index 0000000..587835a --- /dev/null +++ b/tests/replacers/test_user_replacer.py @@ -0,0 +1,43 @@ + +from src.replacers.user_replacer import UserReplacer +from src.annotations import DateAnnotation, CityAnnotation, UserAnnotation + +def test_user_replacer(): + text = "@zzzz32 urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + detections = [ + (0, 7, UserAnnotation()), + (21, 31, DateAnnotation()), + (35, 44, CityAnnotation()), + ] + + replacer = UserReplacer() + result = replacer.replace(text, detections) + + expected_text_ending = " urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" + exptected_detections_left = [ + (len(result[0]) - 23, len(result[0]) - 13, DateAnnotation()), + (len(result[0]) - 9, len(result[0]), CityAnnotation()), + ] + + assert result[0].endswith(expected_text_ending) + assert result[0][0:-len(expected_text_ending)] != "@zzzz32" + assert result[1] == exptected_detections_left + +def test_user_replacer_same_user_same_replacement(): + text = "@zzzz32 @zzzz32 @aaaaa" + detections = [ + (0, 7, UserAnnotation()), + (8, 15, UserAnnotation()), + (16, 22, UserAnnotation()), + + ] + + replacer = UserReplacer() + result = replacer.replace(text, detections) + + old_users = text.split() + new_users = result[0].split() + + assert old_users[0] != new_users[0] + assert new_users[0] == new_users[1] + assert result[1] == [] \ No newline at end of file diff --git a/tests/test_annotation_mapping.py b/tests/test_annotation_mapping.py new file mode 100644 index 0000000..42b4bb2 --- /dev/null +++ b/tests/test_annotation_mapping.py @@ -0,0 +1,19 @@ +from src.annotation_mapping import map_annotatios + +def test_map_annotations(): + ref_annotations = [(0, 3, "Andrzej"), (7, 11, "psa")] + all_annotations = { + "A": [(0, 3, "Andrzej"), (7, 11, "psa")], + "B": [(0, 3, "AndrzejB"), (7, 11, "psaA")], + "C": [(0, 3, "AndrzejC"), (8, 9, "psaC")], + } + + result = map_annotatios(ref_annotations, all_annotations, ["B", "C"]) + excepted = { + (0, 3, "Andrzej"): {"B": (0, 3, "AndrzejB"), "C": (0, 3, "AndrzejC")}, + (7, 11, "psa"): { + "B": (7, 11, "psaA"), + }, + } + + assert result == excepted diff --git a/tests/test_string_replacements.py b/tests/test_string_replacements.py index f44644d..6bea546 100644 --- a/tests/test_string_replacements.py +++ b/tests/test_string_replacements.py @@ -1,4 +1,4 @@ -from src.string_replacements import replace +from src.string_replacements import replace, replace_and_update def test_replace(): text = "Ala ma kota" @@ -17,4 +17,18 @@ def test_replace_out_of_order(): expected = "Andrzej ma psa" result = replace(text, replacements) - assert result == expected \ No newline at end of file + assert result == expected + + +def test_replace_and_update(): + text = "Ala ma kota kropka" + replacements = [(0, 3, "Andrzej"), (7, 11, "psa")] + other_annotations = [(4, 6, "ma"), (12, 18, "kropka")] + + expected_text = "Andrzej ma psa kropka" + expected_other_annotations = [(8, 10, "ma"), (15, 21, "kropka")] + + result_text, result_other_annotations = replace_and_update(text, replacements, other_annotations) + + assert result_text == expected_text + assert result_other_annotations == expected_other_annotations \ No newline at end of file diff --git a/tests/test_tag_anonimization.py b/tests/test_tag_anonimization.py deleted file mode 100644 index 3bfd374..0000000 --- a/tests/test_tag_anonimization.py +++ /dev/null @@ -1,17 +0,0 @@ - -from src.tag_anonimization import replace_with_tags -from src.entity_types import EntityTypes - -def test_replace_with_tags(): - text = "Ala Brzeszczot urodziÅ‚a sie 05.05.2005 we WrocÅ‚awiu" - detections = [ - (0, 3, EntityTypes.NAME), - (4, 14, EntityTypes.SURNAME), - (28, 38, EntityTypes.DATE), - (42, 51, EntityTypes.CITY), - ] - - result = replace_with_tags(text, detections) - expected = "[OSOBA] [OSOBA] urodziÅ‚a sie [DATE] we [MIEJSCE]" - - assert result == expected \ No newline at end of file -- GitLab