"""Implementation of anonymizer functionality for Polish language.""" import math import regex import random from src.utils import consume from src.base_anonymizer import BaseAnonymizer from src.ccl_handler import CCLHandler from src.generators import (generate_pseudo_email, generate_pseudo_phone_number, generate_pseudo_user, generate_pseudo_website, generate_pseudo_date) class PolishAnonymizer(BaseAnonymizer): """Class with an anonymization implementation for the Polish language.""" date_regex = regex.compile( r'\b(?P<day_or_month_year>' r'(?P<day_month1>[0-3]?\d)(?P<punct1>[ \t\-\./,]{1,2})' r'(?P<day_month2>[0-3]?\d)(?P<punct2>[ \t\-\./,]{1,2})' r'(?P<year1>\d{4}|\d{2}))\b|' r'\b(?P<year_month_or_day>(?P<year2>\d{4}|\d{2})' r'(?P<punct3>[ \t\-\./,]{1,2})(?P<day_month3>[0-3]?\d)' r'(?P<punct4>[ \t\-\./,]{1,2})(?P<day_month4>[0-3]?\d))\b|' r'(?P<month_in_words>' r'(?!\b(sty|lut|mar|kwi|maj|cze|lip|sie|wrz|paz|lis|gru)\b)' r'(?:(?P<day1>[0-3]?\d)(?P<punct5>[ \t\-\./,]{0,2}))?' r'\b(?P<month>Sty(?:|cze[nń]|cznia)|Lut(?:|y|ego)|Mar(?:|zec|ca)|' r'Kwi(?:|ecie[nń]|etnia)|Maj(?:|a)|Cze(?:|rwiec|rwca)|Lip(?:|iec|ca)' r'|Sie(?:|rpie[nń]|rpnia)|Wrz(?:|esie[nń]|e[śs]nia)' r'|Pa[zź](?:|dziernik|dziernika)|Lis(?:|topad|topada)' r'|Gru(?:|dzie[nń]|dnia))\b' r'((?:(?P<punct7>[ \t\-\./,]{0,2})(?P<day2>[0-3]?\d))' r'(?:(?P<punct8>[ \t\-\./,]{1,2})(?P<year4>\d{4}|\d{2}))|' r'(?:(?P<punct6>[ \t\-\./,]{0,2})(?P<year3>\d{4}|\d{2})))?)', regex.I ) _file_to_liner_dispatch = { 'nam_liv_person': 'person_first_nam', 'nam_liv_person_last': 'person_last_nam', 'nam_fac_road': 'road_nam', 'nam_loc_gpe_city': 'city_nam', 'nam_org_group_team': 'country_nam' } _liner_to_tag_dispatch = { 'person_first_nam': '[OSOBA]', 'person_last_nam': '[OSOBA]', 'road_nam': '[MIEJSCE]', 'city_nam': '[MIEJSCE]', 'country_nam': '[MIEJSCE]' } def __init__(self, task_options): """Initialize anonymizer with base regexes.""" super().__init__(task_options) self.lang = task_options.get('language', 'pl') # Order is important, first more specific self._category_anonymisation = { 'user': (self.user_regex, self._user_token, generate_pseudo_user, {}), 'email': (self.email_regex, self._mail_token, generate_pseudo_email, {}), 'website': (self.website_regex, self._website_token, generate_pseudo_website, {}), 'date': (self.date_regex, self._date_token, generate_pseudo_date, {'lang': self.lang}), 'phone_number': (self.phone_number_regex, self._digits_token, generate_pseudo_phone_number, {}), } self.unmarshallers = { 'chunk': lambda *args: '\n', 'sentence': lambda *args: self._process_sent_tree(*args), } self._form_dict = dict() self._pseudo_ann_list = list() self._load_file() def _load_file(self, filename='pl_dict.txt'): with open(filename, 'r', encoding='utf-8') as f: for line in f.readlines(): l_list = line.split() cat = l_list[0] if cat in self._file_to_liner_dispatch: cat_name = self._file_to_liner_dispatch[cat] length = int((len(l_list) - 2) / 2) gen_name = ' '.join(l_list[(1 + length):(1 + 2 * length)]) flx_name = ' '.join(l_list[1:(1 + length)]) flex = l_list[-1] if cat_name not in self._form_dict: self._form_dict[cat_name] = dict() if length not in self._form_dict[cat_name]: self._form_dict[cat_name][length] = dict() if gen_name not in self._form_dict[cat_name][length]: self._form_dict[cat_name][length][gen_name] = dict() self._form_dict[cat_name][length][gen_name][flex] = flx_name for cat in self._form_dict: for length in self._form_dict[cat]: self._form_dict[cat][length] = list( self._form_dict[cat][length].items() ) def _handle_annotated(self, id, text, tag, ann): if self._method == 'delete': return '' elif self._method == 'tag': if ann in self._liner_to_tag_dispatch: return self._liner_to_tag_dispatch[ann] elif self._method == 'pseudo': if ann in self._form_dict: self._pseudo_ann_list.append((id, text, tag, ann)) return text def _process_sentence(self, string_builder): string_builder = self._handle_pseudo_ann(string_builder) sentence = ''.join(string_builder) return self._anonymize(sentence) def _get_pseudo_ann(self, ann, tag, length): while length not in self._form_dict[ann] and length > 0: length -= 1 if length == 0: return '' new_tag = ':'.join(tag.split(':')[1:4]) for _ in range(0, 10): random_entry = random.choice(self._form_dict[ann][length]) if new_tag in random_entry[1]: return random_entry[1][new_tag] if new_tag == 'ign': return random_entry[0] random_entry = random.choice(self._form_dict[ann][length]) return random_entry[0] def _handle_pseudo_ann(self, string_builder): if not self._pseudo_ann_list: return string_builder shifted_id = 0 pseudo_ann_iter = enumerate(iter(self._pseudo_ann_list)) for i, (id_, _, tag, ann) in pseudo_ann_iter: j = i + 1 start_id = id_ + shifted_id ann_len = 1 skip_tokens = 1 while j < len(self._pseudo_ann_list): next_id, _, _, next_ann = self._pseudo_ann_list[j] next_id += shifted_id if ann != next_ann or (ann == 'person_first_nam' or ann == 'person_last_nam'): break if next_id == id_ + 1 and string_builder[next_id] == '-': skip_tokens += 1 elif next_id == id_ + 1 and string_builder[id_] == '-': ann_len += 1 skip_tokens += 1 elif next_id == id_ + 2 and string_builder[id_ + 1] == ' ': ann_len += 1 skip_tokens += 2 else: break id_ = next_id j += 1 new_text = self._get_pseudo_ann( ann=ann, tag=tag, length=ann_len ) new_text = regex.split('( )', new_text) string_builder = string_builder[:start_id] + new_text + \ string_builder[start_id + skip_tokens:] if ann_len > 1: consume(pseudo_ann_iter, ann_len - 1) if math.ceil(len(new_text) / 2) != ann_len: shifted_id += len(new_text) - (ann_len * 2) + 1 self._pseudo_ann_list.clear() return string_builder def _anonymize(self, sentence): if self._method == 'delete': for pattern, _, _, _ in self._category_anonymisation.values(): sentence = regex.sub(pattern, '', sentence) elif self._method == 'tag': sentence = self._tagging(sentence) elif self._method == 'pseudo': sentence = self._pseudonymization(sentence) return sentence def process(self, input_filename, output_filename): """Anonymize the file in CCL format to the resulting file in plain text. Args: input_filename (str): Input filename in CCL format. \ Text tagged and processed with LINER. output_filename (str): Output filename. """ ccl_handler = CCLHandler(input_filename) ccl_handler.process(output_filename, self.unmarshallers)