Skip to content
Snippets Groups Projects
Commit 2d8783a5 authored by Michał Pogoda's avatar Michał Pogoda
Browse files

Working MVP of pipeline

parent 8bc5de43
Branches
2 merge requests!10Anonimizer v2,!7Better coverage
Pipeline #7500 failed with stage
in 19 seconds
Showing
with 501 additions and 102 deletions
class Pipeline:
def run(self, input) -> str:
raise NotImplementedError
\ No newline at end of file
from src.replacers.interface import ReplacerInterface
from src.replacers.tag_replacer import TagReplacer
\ No newline at end of file
from typing import List, Tuple
from src.annotations import (
Annotation,
DateAnnotation,
)
from src.string_replacements import replace_and_update
from src.replacers.interface import ReplacerInterface
import random
# TODO: Add support for other languages
months_map = {
1: "stycznia",
2: "lutego",
3: "marca",
4: "kwietnia",
5: "maja",
6: "czerwca",
7: "lipca",
8: "sierpnia",
9: "września",
10: "października",
11: "listopada",
12: "grudnia",
}
class DateReplacer(ReplacerInterface):
def __init__(self):
pass
def replace(self, text: str, detections: List[Tuple[int, int, Annotation]]) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
replacements = []
not_processed = []
already_replaced = dict()
for item in detections:
start, end, detection = item
if isinstance(detection, DateAnnotation):
replacement = []
if detection.format is not None:
format = detection.format
else:
format = [
(DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"),
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"),
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2020"),
]
if text[start:end] in already_replaced:
replacement = already_replaced[text[start:end]]
else:
for entry in format:
if entry[0] == DateAnnotation.AnnotationPart.TWO_DIGITS_DAY:
random_day = random.randint(1, 28)
replacement.append(str(random_day).zfill(2))
elif entry[0] == DateAnnotation.AnnotationPart.ONE_DIGIT_DAY:
random_day = random.randint(1, 28)
replacement.append(str(random_day))
elif entry[0] == DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH:
random_month = random.randint(1, 12)
replacement.append(str(random_month).zfill(2))
elif entry[0] == DateAnnotation.AnnotationPart.ONE_DIGIT_MONTH:
random_month = random.randint(1, 12)
replacement.append(str(random_month))
elif entry[0] == DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR:
random_year = random.randint(1900, 2020)
replacement.append(str(random_year))
elif entry[0] == DateAnnotation.AnnotationPart.TWO_DIGIT_YEAR:
random_year = random.randint(0, 99)
replacement.append(str(random_year).zfill(2))
elif entry[0] == DateAnnotation.AnnotationPart.TEXT_MONTH:
random_month = random.randint(1, 12)
month_name = months_map[random_month]
replacement.append(month_name)
elif entry[0] == DateAnnotation.AnnotationPart.OTHER:
replacement.append(entry[1])
replacement = "".join(replacement)
already_replaced[text[start:end]] = replacement
replacements.append((start, end, replacement))
else:
not_processed.append(item)
return replace_and_update(text, replacements, not_processed)
\ No newline at end of file
from typing import List, Tuple
from src.annotations import Annotation
from src.string_replacements import replace
from src.replacers.interface import ReplacerInterface
class DeleteReplacer(ReplacerInterface):
def __init__(self):
pass
def replace(
self, text: str, detections: List[Tuple[int, int, Annotation]]
) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
result = [
(start, end, "")
for start, end, _ in detections
]
return replace(text, result), []
\ No newline at end of file
from typing import List, Tuple
from src.annotations import (
Annotation,
EmailAnnotation,
)
from src.string_replacements import replace_and_update
from src.replacers.interface import ReplacerInterface
import random
import string
def random_char(char_num):
return ''.join(random.choice(string.ascii_letters) for _ in range(char_num))
def random_email():
return random_char(7)+"@gmail.com"
class EmailReplacer(ReplacerInterface):
def __init__(self):
pass
def replace(self, text: str, detections: List[Tuple[int, int, Annotation]]) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
replacements = []
not_processed = []
already_replaced = dict()
for item in detections:
start, end, detection = item
if isinstance(detection, EmailAnnotation):
if text[start:end] not in already_replaced:
already_replaced[text[start:end]] = random_email()
replacements.append((start, end, already_replaced[text[start:end]]))
else:
not_processed.append(item)
return replace_and_update(text, replacements, not_processed)
\ No newline at end of file
from abc import ABC, abstractmethod
from typing import List, Tuple
from src.annotations import Annotation
class ReplacerInterface(ABC):
@abstractmethod
def replace(
self, text: str, detections: List[Tuple[int, int, Annotation]]
) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
"""Replace detected entities in text with anonimized version.
Args:
text (str): Text to be processed.
detections (List[Tuple[int, int, str]]): List of detections.
Returns:
Tuple[str, List[Tuple[int, int, str]]]: Text with supported entities
replaced with anonimized version and list of detections that were
not processed by this replacer.
"""
pass
from typing import List, Tuple
from src.annotations import (
Annotation,
)
from src.string_replacements import replace_and_update
from src.replacers.interface import ReplacerInterface
from src.dictionaries.morphosyntactic import MorphosyntacticDictionary
class NERReplacer(ReplacerInterface):
def __init__(self, dictionary: MorphosyntacticDictionary):
self._dictionary = dictionary
def replace(
self, text: str, detections: List[Tuple[int, int, Annotation]]
) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
replacements = []
not_processed = []
already_replaced = dict()
for item in detections:
start, end, detection = item
key = (text[start:end], type(detection))
if key not in already_replaced:
replacement = self._dictionary.get_random_replacement(detection)
already_replaced[key] = replacement
if already_replaced[key] is None:
not_processed.append(item)
else:
replacements.append((start, end, already_replaced[key]))
return replace_and_update(text, replacements, not_processed)
from typing import List, Tuple
from src.annotations import (
Annotation,
NameAnnotation,
SurnameAnnotation,
StreetNameAnnotation,
CityAnnotation,
CountryAnnotation,
PhoneNumberAnnotation,
UrlAnnotation,
UserAnnotation,
EmailAnnotation,
DateAnnotation,
TINAnnotation,
KRSAnnotation,
)
from src.string_replacements import replace
from src.replacers.interface import ReplacerInterface
class TagReplacer(ReplacerInterface):
def __init__(self):
self.tags_map = {
NameAnnotation: "[OSOBA]",
SurnameAnnotation: "[OSOBA]",
StreetNameAnnotation: "[MIEJSCE]",
CityAnnotation: "[MIEJSCE]",
CountryAnnotation: "[MIEJSCE]",
PhoneNumberAnnotation: "[DIGITS]",
UrlAnnotation: "[WWW]",
UserAnnotation: "@[USER]",
EmailAnnotation: "[MAIL]",
DateAnnotation: "[DATE]",
TINAnnotation: "[DIGITS]",
KRSAnnotation: "[DIGITS]",
}
def replace(
self, text: str, detections: List[Tuple[int, int, Annotation]]
) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
result = [
(start, end, self.tags_map.get(type(entity_type), "[OTHER]"))
for start, end, entity_type in detections
]
return replace(text, result), []
\ No newline at end of file
from typing import List, Tuple
from src.annotations import (
Annotation,
UserAnnotation,
)
from src.string_replacements import replace_and_update
from src.replacers.interface import ReplacerInterface
from random_username.generate import generate_username
class UserReplacer(ReplacerInterface):
def __init__(self):
pass
def replace(self, text: str, detections: List[Tuple[int, int, Annotation]]) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
replacements = []
not_processed = []
already_replaced = dict()
for item in detections:
start, end, detection = item
if isinstance(detection, UserAnnotation):
if text[start:end] not in already_replaced:
username = "@" + generate_username(1)[0]
already_replaced[text[start:end]] = username
replacements.append((start, end, already_replaced[text[start:end]]))
else:
not_processed.append(item)
return replace_and_update(text, replacements, not_processed)
\ No newline at end of file
from typing import List, Tuple
from typing import List, Tuple, Any, TypeVar
def replace(original_string: str, replacements: List[Tuple[int, int, str]]):
def replace(original_string: str, replacements: List[Tuple[int, int, str]]) -> str:
"""
Replaces substrings in a string.
!!! Important: This function assumes that there are no overlapping annotations.
Parameters
----------
original_string : str
The original string.
replacements : List[Tuple[int, int, str]]
A list of tuples containing (start, end, replacement).
Returns
-------
str
The string with replacements applied.
"""
replacements = sorted(replacements, key=lambda x: x[0])
delta = 0
for replacement in replacements:
original_string = original_string[:replacement[0] + delta] + replacement[2] + original_string[replacement[1] + delta:]
original_string = (
original_string[: replacement[0] + delta]
+ replacement[2]
+ original_string[replacement[1] + delta :]
)
delta += len(replacement[2]) - (replacement[1] - replacement[0])
return original_string
_T = TypeVar("_T")
def replace_and_update(
original_string: str,
replacements: List[Tuple[int, int, str]],
other_annotations: List[Tuple[int, int, _T]],
) -> Tuple[str, List[Tuple[int, int, _T]]]:
""" Replaces substrings in a string and updates other annotations to match new string.
!!! Important: This function assumes that there are no overlapping annotations.
Parameters
----------
original_string : str
The original string.
replacements : List[Tuple[int, int, str]]
A list of tuples containing (start, end, replacement).
other_annotations : List[Tuple[int, int, Any]]
A list of other annotations.
Returns
-------
Tuple[str, List[Tuple[int, int, Any]]]
The string with replacements applied and other annotations with new positions.
"""
joined_list = []
for replacement in replacements:
joined_list.append((replacement[0], replacement[1], replacement[2], True))
for other_annotation in other_annotations:
joined_list.append((other_annotation[0], other_annotation[1], other_annotation[2], False))
annotations = sorted(joined_list, key=lambda x: x[0])
new_other_annotations = []
delta = 0
for annotation in annotations:
is_replacement = annotation[3]
return original_string
\ No newline at end of file
if is_replacement:
original_string = (
original_string[: annotation[0] + delta]
+ annotation[2]
+ original_string[annotation[1] + delta :]
)
delta += len(annotation[2]) - (annotation[1] - annotation[0])
else:
new_other_annotations.append((annotation[0] + delta, annotation[1] + delta, annotation[2]))
return original_string, new_other_annotations
\ No newline at end of file
from typing import List, Tuple, Any
class Suppressor:
def suppress(self, annotations: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]:
"""Suppresses annotations on overlappment.
Args:
annotations (List[Tuple[int, int, Any]]): List of annotations.
Returns:
List[Tuple[int, int, Any]]: List of annotations with overlapping
annotations removed.
"""
raise NotImplementedError
\ No newline at end of file
from typing import List, Tuple, Dict
from typing import List, Tuple, Dict, Any
from bitarray import bitarray
from src.suppressors.interface import Suppressor
def suppress_order_based(annotations: List[Tuple[int, int, str]]) -> List[Tuple[int, int, str]]:
class OrderBasedSuppressor(Suppressor):
def __init__(self) -> None:
super().__init__()
def suppress(self, annotations: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]:
return suppress_order_based(annotations)
def suppress_order_based(annotations: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]:
"""If two annotations overlap, the first one int the list is kept.
Args:
annotations (List[Tuple[int, int, str]]): List of annotations.
annotations (List[Tuple[int, int, Any]]): List of annotations.
Returns:
List[Tuple[int, int, str]]: List of annotations with overlapping
List[Tuple[int, int, Any]]: List of annotations with overlapping
annotations removed.
"""
if len(annotations) == 0:
return annotations
annotations = annotations
bitarray_size = max([end for _, end, _ in annotations])
bitarray_ = bitarray(bitarray_size)
......
from typing import List, Tuple
from collections import defaultdict
from src.entity_types import EntityTypes
from src.string_replacements import replace
def replace_with_tags(text: str, detections: List[Tuple[int, int, str]]) -> str:
"""Replace entities with tags.
Args:
text (str): Text to be processed.
detections (List[Tuple[int, int, str]]): List of detections.
Returns:
str: Text with entities replaced with tags.
"""
tags_map = {
EntityTypes.NAME: "[OSOBA]",
EntityTypes.SURNAME: "[OSOBA]",
EntityTypes.STREET_NAME: "[MIEJSCE]",
EntityTypes.CITY: "[MIEJSCE]",
EntityTypes.COUNTRY: "[MIEJSCE]",
EntityTypes.PHONE_NUMBER: "[DIGITS]",
EntityTypes.URL: "[WWW]",
EntityTypes.USER: "@[USER]",
EntityTypes.EMAIL: "[MAIL]",
EntityTypes.DATE: "[DATE]",
EntityTypes.TIN: "[DIGITS]",
EntityTypes.KRS: "[DIGITS]",
}
result = [
(start, end, tags_map.get(entity_type, "[OTHER]"))
for start, end, entity_type in detections
]
return replace(text, result)
\ No newline at end of file
from src.entity_types import EntityTypes
NER_PL_N5_MAPPING = {
"nam_liv_person": EntityTypes.NAME,
"nam_liv_person_last": EntityTypes.SURNAME,
"nam_fac_road": EntityTypes.STREET_NAME,
"nam_loc_gpe_city": EntityTypes.CITY,
"nam_org_group_team": EntityTypes.COUNTRY,
}
\ No newline at end of file
......@@ -2,17 +2,35 @@
import logging
import nlp_ws
from src.anonymizers.polish_anonymizer import PolishAnonymizer
from src.anonymizers.english_anonymizer import EnglishAnonymizer
from src.anonymizers.russian_anonymizer import RussianAnonymizer
from hydra import initialize, compose
from hydra.utils import instantiate
_log = logging.getLogger(__name__)
class Worker(nlp_ws.NLPWorker):
"""Implements nlp_worker for anonymizer service."""
def __init__(self) -> None:
self._last_config = None
self._pipeline = None
super().__init__()
def _prepare_pipeline(self, task_options):
language = task_options.get('language', 'pl')
replace_method = task_options.get('method', 'tag')
overrides = [
"language=" + language,
"replacers=" + replace_method,
]
config_hash = hash(tuple(overrides))
if self._last_config != config_hash:
with initialize(config_path="./config"):
cfg = compose(config_name="config", overrides=overrides)
self._pipeline = instantiate(cfg["pipeline"])
return self._pipeline
def process(self, input_file, task_options, output_file):
"""Anonymizes input text.
......@@ -23,11 +41,10 @@ class Worker(nlp_ws.NLPWorker):
method - 'delete'/'tag'/'pseudo' - 'delete' deletes selected tokens,
'tag' replaces selected tokens with arbitrary tags, 'pseudo'
replaces selected tokens with a random token that
language - 'pl' - language of the input text. As of now only Polish is supported.
"""
lang = task_options.get('language', 'pl')
anonymizers = {'pl': PolishAnonymizer,
'en': EnglishAnonymizer,
'ru': RussianAnonymizer
}
anon = anonymizers.get(lang, PolishAnonymizer)(task_options)
anon.process(input_file, output_file)
pipeline = self._prepare_pipeline(task_options)
with open(output_file, 'w', encoding='utf-8') as f:
result = pipeline.run(input_file)
f.write(result)
from src.detectors.date.en import detect_dates_en
from src.entity_types import EntityTypes
from src.annotations import DateAnnotation
from src.detectors.date.date import DateDetector
def test_detect_dates_en():
detector = DateDetector("en")
# Check en-us
text = "On 1.01.2022, I sold my cat. On April 5, 2021, I bought a dog."
found_dates = detect_dates_en(text)
found_dates = detector.detect(text, dict())
format_date1 = [
(DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"), # Only supports two digits for now
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"),
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2022")
]
format_date2 = [
(DateAnnotation.AnnotationPart.TEXT_MONTH, "April"),
(DateAnnotation.AnnotationPart.OTHER, " "),
(DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "05"), # Only supports two digits for now
(DateAnnotation.AnnotationPart.OTHER, ", "),
(DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2021"),
]
assert found_dates == [(3, 12, EntityTypes.DATE), (32, 45, EntityTypes.DATE)]
assert found_dates == [(3, 12, DateAnnotation(format_date1)), (32, 45, DateAnnotation(format_date2))]
# Check en-gb
# TODO: Following test fails. Fix it.
# text = "On 1.01.2022 I sold the cat. On 5th April 2021 I bought a dog."
# found_dates = detect_dates_en(text)
# assert found_dates == [(3,12, EntityTypes.DATE), (32,46, EntityTypes.DATE)]
# assert found_dates == [(3,12, DateAnnotation()), (32,46, DateAnnotation())]
from src.detectors.date.pl import detect_dates_pl
from src.entity_types import EntityTypes
from src.annotations import DateAnnotation
from src.detectors.date.date import DateDetector
def test_detect_dates_pl():
detector = DateDetector("pl")
text = "W dniu 1.01.2022 sprzedałem kota. 5 kwietnia 2021 roku kupiłem psa."
found_dates = detect_dates_pl(text)
found_dates = detector.detect(text, dict())
assert found_dates == [(7, 16, EntityTypes.DATE), (34, 49, EntityTypes.DATE)]
format_date1 = [
(DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"), # Only supports two digits for now
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"),
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2022")
]
format_date2 = [
(DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "05"), # Only supports two digits for now
(DateAnnotation.AnnotationPart.OTHER, " "),
(DateAnnotation.AnnotationPart.TEXT_MONTH, "kwietnia"),
(DateAnnotation.AnnotationPart.OTHER, " "),
(DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2021"),
]
assert found_dates == [(7, 16, DateAnnotation(format_date1)), (34, 49, DateAnnotation(format_date2))]
\ No newline at end of file
from src.detectors.date.ru import detect_dates_ru
from src.entity_types import EntityTypes
from src.annotations import DateAnnotation
from src.detectors.date.date import DateDetector
def test_detect_dates_pl():
detector = DateDetector("ru")
text = "1.01.2022 я продал кошку. 5 апреля 2021 я купил собаку."
found_dates = detect_dates_ru(text)
assert found_dates == [(0, 9, EntityTypes.DATE), (26, 39, EntityTypes.DATE)]
found_dates = detector.detect(text, dict())
format_date1 = [
(DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "01"), # Only supports two digits for now
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.TWO_DIGIT_MONTH, "01"),
(DateAnnotation.AnnotationPart.OTHER, "."),
(DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2022")
]
format_date2 = [
(DateAnnotation.AnnotationPart.TWO_DIGITS_DAY, "05"),
(DateAnnotation.AnnotationPart.OTHER, " "),
(DateAnnotation.AnnotationPart.TEXT_MONTH, "апреля"), # Only supports two digits for now
(DateAnnotation.AnnotationPart.OTHER, " "),
(DateAnnotation.AnnotationPart.FOUR_DIGIT_YEAR, "2021"),
]
assert found_dates == [(0, 9, DateAnnotation(format_date1)), (26, 39, DateAnnotation(format_date2))]
from src.detectors.email import detect_emails
from src.entity_types import EntityTypes
from src.annotations import EmailAnnotation
from src.detectors.email import EmailDetector
def test_detect_emails():
detector = EmailDetector()
text = "My email is arkadiusz@borek.pw. My friend's email is arkadiusz.dump@pwr.edu.pl"
found_emails = detect_emails(text, "en")
found_emails = detector.detect(text, dict())
assert found_emails == [(12, 30, EntityTypes.EMAIL), (53, 78, EntityTypes.EMAIL)]
\ No newline at end of file
assert found_emails == [(12, 30, EmailAnnotation()), (53, 78, EmailAnnotation())]
\ No newline at end of file
from src.detectors.ner.pl_liner_n5 import detect_ner_pl_liner_n5
from src.entity_types import EntityTypes
from src.annotations import NameAnnotation, SurnameAnnotation, CityAnnotation
from src.detectors.ner import NerDetector
def test_detect_names_pl_liner_n5():
detector = NerDetector("pl")
ccl_annotations = {
'nam_liv_person': [(10, 16, 'Marian'), (100, 109, 'Magdalena')],
'nam_liv_person_last': [(30, 35, 'Nowak')],
'nam_loc_gpe_city': [(50, 59, 'Wrocławiu')],
'person_first_nam': [(10, 16, 'Marian'), (100, 109, 'Magdalena')],
'person_last_nam': [(30, 35, 'Nowak')],
'city_nam': [(50, 59, 'Wrocławiu')],
'some_other_annotation': [(120, 124, 'zowd')],
}
result = detect_ner_pl_liner_n5(ccl_annotations)
result = detector.detect("", ccl_annotations)
expected = [
(10, 16, EntityTypes.NAME),
(100, 109, EntityTypes.NAME),
(30, 35, EntityTypes.SURNAME),
(50, 59, EntityTypes.CITY),
(10, 16, NameAnnotation()),
(100, 109, NameAnnotation()),
(30, 35, SurnameAnnotation()),
(50, 59, CityAnnotation()),
]
assert set(result) == set(expected)
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment