Skip to content
Snippets Groups Projects
Select Git revision
  • c9c06d392a84b93b7b9ad75dc061e25f982d6ff6
  • master default protected
  • develop protected
  • feat_remove_attr
  • python2.7
  • python3.8
6 results

token.cpp

Blame
  • polish_anonymizer.py 8.23 KiB
    """Implementation of anonymizer functionality for Polish language."""
    import math
    import regex
    import random
    
    
    from src.utils.utils import consume
    from src.base_anonymizer import BaseAnonymizer
    from src.ccl_handler import CCLHandler
    from src.generators import (generate_pseudo_email, generate_pseudo_phone_number,
                                generate_pseudo_user, generate_pseudo_website,
                                generate_pseudo_date)
    
    
    class PolishAnonymizer(BaseAnonymizer):
        """Class with an anonymization implementation for the Polish language."""
    
        date_regex = regex.compile(
            r'\b(?P<day_or_month_year>'
            r'(?P<day_month1>[0-3]?\d)(?P<punct1>[ \t\-\./,]{1,2})'
            r'(?P<day_month2>[0-3]?\d)(?P<punct2>[ \t\-\./,]{1,2})'
            r'(?P<year1>\d{4}|\d{2}))\b|'
    
            r'\b(?P<year_month_or_day>(?P<year2>\d{4}|\d{2})'
            r'(?P<punct3>[ \t\-\./,]{1,2})(?P<day_month3>[0-3]?\d)'
            r'(?P<punct4>[ \t\-\./,]{1,2})(?P<day_month4>[0-3]?\d))\b|'
    
            r'(?P<month_in_words>'
            r'(?!\b(sty|lut|mar|kwi|maj|cze|lip|sie|wrz|paz|lis|gru)\b)'
            r'(?:(?P<day1>[0-3]?\d)(?P<punct5>[ \t\-\./,]{0,2}))?'
            r'\b(?P<month>Sty(?:|cze[nń]|cznia)|Lut(?:|y|ego)|Mar(?:|zec|ca)|'
            r'Kwi(?:|ecie[nń]|etnia)|Maj(?:|a)|Cze(?:|rwiec|rwca)|Lip(?:|iec|ca)'
            r'|Sie(?:|rpie[nń]|rpnia)|Wrz(?:|esie[nń]|e[śs]nia)'
            r'|Pa[zź](?:|dziernik|dziernika)|Lis(?:|topad|topada)'
            r'|Gru(?:|dzie[nń]|dnia))\b'
            r'((?:(?P<punct7>[ \t\-\./,]{0,2})(?P<day2>[0-3]?\d))'
            r'(?:(?P<punct8>[ \t\-\./,]{1,2})(?P<year4>\d{4}|\d{2}))|'
            r'(?:(?P<punct6>[ \t\-\./,]{0,2})(?P<year3>\d{4}|\d{2})))?)', regex.I
        )
    
        _file_to_liner_dispatch = {
            'nam_liv_person': 'person_first_nam',
            'nam_liv_person_last': 'person_last_nam',
            'nam_fac_road': 'road_nam',
            'nam_loc_gpe_city': 'city_nam',
            'nam_org_group_team': 'country_nam'
        }
    
        _liner_to_tag_dispatch = {
            'person_first_nam': '[OSOBA]',
            'person_last_nam': '[OSOBA]',
            'road_nam': '[MIEJSCE]',
            'city_nam': '[MIEJSCE]',
            'country_nam': '[MIEJSCE]'
        }
    
        def __init__(self, task_options):
            """Initialize anonymizer with base regexes."""
            super().__init__(task_options)
            self.lang = task_options.get('language', 'pl')
            # Order is important, first more specific
            self._category_anonymisation = {
                'user': (self.user_regex, self._user_token,
                         generate_pseudo_user, {}),
                'email': (self.email_regex, self._mail_token,
                          generate_pseudo_email, {}),
                'website': (self.website_regex, self._website_token,
                            generate_pseudo_website, {}),
                'date': (self.date_regex, self._date_token,
                         generate_pseudo_date, {'lang': self.lang}),
                'phone_number': (self.phone_number_regex, self._digits_token,
                                 generate_pseudo_phone_number, {}),
            }
            self.unmarshallers = {
                'chunk': lambda *args: '\n',
                'sentence': lambda *args: self._process_sent_tree(*args),
            }
            self._form_dict = dict()
            self._pseudo_ann_list = list()
            self._load_file()
    
        def _load_file(self, filename='dictionaries/pl_dict.txt'):
            with open(filename, 'r', encoding='utf-8') as f:
                for line in f.readlines():
                    l_list = line.split()
                    cat = l_list[0]
                    if cat in self._file_to_liner_dispatch:
                        cat_name = self._file_to_liner_dispatch[cat]
                        length = int((len(l_list) - 2) / 2)
                        gen_name = ' '.join(l_list[(1 + length):(1 + 2 * length)])
                        flx_name = ' '.join(l_list[1:(1 + length)])
                        flex = l_list[-1]
                        if cat_name not in self._form_dict:
                            self._form_dict[cat_name] = dict()
                        if length not in self._form_dict[cat_name]:
                            self._form_dict[cat_name][length] = dict()
                        if gen_name not in self._form_dict[cat_name][length]:
                            self._form_dict[cat_name][length][gen_name] = dict()
                        self._form_dict[cat_name][length][gen_name][flex] = flx_name
            for cat in self._form_dict:
                for length in self._form_dict[cat]:
                    self._form_dict[cat][length] = list(
                        self._form_dict[cat][length].items()
                    )
    
        def _handle_annotated(self, id, text, tag, ann):
            if self._method == 'delete':
                return ''
            elif self._method == 'tag':
                if ann in self._liner_to_tag_dispatch:
                    return self._liner_to_tag_dispatch[ann]
            elif self._method == 'pseudo':
                if ann in self._form_dict:
                    self._pseudo_ann_list.append((id, text, tag, ann))
            return text
    
        def _process_sentence(self, string_builder):
            string_builder = self._handle_pseudo_ann(string_builder)
            sentence = ''.join(string_builder)
            return self._anonymize(sentence)
    
        def _get_pseudo_ann(self, ann, tag, length):
            while length not in self._form_dict[ann] and length > 0:
                length -= 1
            if length == 0:
                return ''
            new_tag = ':'.join(tag.split(':')[1:4])
            for _ in range(0, 10):
                random_entry = random.choice(self._form_dict[ann][length])
                if new_tag in random_entry[1]:
                    return random_entry[1][new_tag]
                if new_tag == 'ign':
                    return random_entry[0]
            random_entry = random.choice(self._form_dict[ann][length])
            return random_entry[0]
    
        def _handle_pseudo_ann(self, string_builder):
            if not self._pseudo_ann_list:
                return string_builder
            shifted_id = 0
            pseudo_ann_iter = enumerate(iter(self._pseudo_ann_list))
            for i, (id_, _, tag, ann) in pseudo_ann_iter:
                j = i + 1
                start_id = id_ + shifted_id
                ann_len = 1
                skip_tokens = 1
                while j < len(self._pseudo_ann_list):
                    next_id, _, _, next_ann = self._pseudo_ann_list[j]
                    next_id += shifted_id
                    if ann != next_ann or (ann == 'person_first_nam' or
                                           ann == 'person_last_nam'):
                        break
                    if next_id == id_ + 1 and string_builder[next_id] == '-':
                        skip_tokens += 1
                    elif next_id == id_ + 1 and string_builder[id_] == '-':
                        ann_len += 1
                        skip_tokens += 1
                    elif next_id == id_ + 2 and string_builder[id_ + 1] == ' ':
                        ann_len += 1
                        skip_tokens += 2
                    else:
                        break
                    id_ = next_id
                    j += 1
                new_text = self._get_pseudo_ann(
                    ann=ann,
                    tag=tag,
                    length=ann_len
                )
                new_text = regex.split('( )', new_text)
                string_builder = string_builder[:start_id] + new_text + \
                    string_builder[start_id + skip_tokens:]
                if ann_len > 1:
                    consume(pseudo_ann_iter, ann_len - 1)
                if math.ceil(len(new_text) / 2) != ann_len:
                    shifted_id += len(new_text) - (ann_len * 2) + 1
            self._pseudo_ann_list.clear()
            return string_builder
    
        def _anonymize(self, sentence):
            if self._method == 'delete':
                for pattern, _, _, _ in self._category_anonymisation.values():
                    sentence = regex.sub(pattern, '', sentence)
            elif self._method == 'tag':
                sentence = self._tagging(sentence)
            elif self._method == 'pseudo':
                sentence = self._pseudonymization(sentence)
            return sentence
    
        def process(self, input_filename, output_filename):
            """Anonymize the file in CCL format to the resulting file in plain text.
    
            Args:
                input_filename (str): Input filename in CCL format. \
                    Text tagged and processed with LINER.
                output_filename (str): Output filename.
    
            """
            ccl_handler = CCLHandler(input_filename)
            ccl_handler.process(output_filename, self.unmarshallers)