Select Git revision
base_anonymizer.py

Michał Pogoda authored
base_anonymizer.py 5.88 KiB
"""Abstract description of anonymizer including base regexes."""
import regex
from abc import ABC, abstractmethod
from src.generators import generate_phone_number_tag
regex.compile(r'\B(?P<username>\@[\w\-]+)')
# This regex detects the following
class BaseAnonymizer(ABC):
"""Base abstract class for anonymization."""
email_regex = regex.compile(
r'(?P<local_part>[a-z0-9!#$%&\'*+/=?^_`{|}~-]+'
r'(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*@)'
r'(?P<domain>(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+)'
r'(?P<tld>[a-z0-9](?:[a-z0-9-]*[a-z0-9])?)', regex.I
)
user_regex = regex.compile(r'\B(?P<username>\@[\w\-]+)')
_website_exceptions = ['m.in']
website_regex = regex.compile(
r'\b(?:{})\b(*SKIP)(*FAIL)|'.format('|'.join(_website_exceptions)) +
r'(?:(?P<protocol>(?:(?:https?|ftp):)?\/\/)?'
r'(?P<auth>\S+(?::\S*)?@)?'
r'(?P<host>(?!(?:10|127)(?:\.\d{1,3}){3})'
r'(?!(?:169\.254|192\.168)(?:\.\d{1,3}){2})'
r'(?!172\.(?:1[6-9]|2\d|3[0-1])(?:\.\d{1,3}){2})'
r'(?:[1-9]\d?|1\d\d|2[01]\d|22[0-3])'
r'(?:\.(?:1?\d{1,2}|2[0-4]\d|25[0-5])){2}'
r'(?:\.(?:[1-9]\d?|1\d\d|2[0-4]\d|25[0-4]))'
r'|'
r'((?:[a-z0-9\u00a1-\uffff][a-z0-9\u00a1-\uffff_-]{0,62})?'
r'[a-z0-9\u00a1-\uffff]\.)+)'
r'(?P<tld>[a-z\u00a1-\uffff]{2,}\.??)'
r'(?P<port>:\d{2,5})?'
r'(?P<path>[/?#]\S*)?)',
regex.UNICODE | regex.I
)
phone_number_regex = regex.compile(
r'(?P<country_code>(00[1-9]\d?)|(\(?([+\d]{2,3})\)?)[- ]??)?'
r'(?P<number>(\d[- ]??){9,10})'
)
def __init__(self, task_options):
"""Initialize anonymizer with chosen method and default tokens."""
self._mail_token = '[MAIL]'
self._user_token = '@[USER]'
self._website_token = '[WWW]'
self._digits_token = '[DIGITS]'
self._date_token = '[DATE]'
self._default_token = '[INNE]'
self._method = task_options.get('method', 'delete')
self._category_anonymisation = {}
self._form_dict = {}
self._pseudo_ann_list = []
def _process_lex(self, lex_subtree):
tag = ''
for elem in lex_subtree:
if elem.tag == 'ctag':
tag = elem.text
elif elem.tag != 'base':
raise Exception('Unrecognized tag inside lex: ' + elem.tag)
if tag == '':
raise Exception('Lex tag had no ctag inside!')
return tag
def _tagging(self, sentence):
for category in self._category_anonymisation:
pattern, token, _, _ = self._category_anonymisation[category]
if category == 'phone_number':
matches = [m for m in pattern.finditer(sentence)]
for match in matches:
tag = generate_phone_number_tag(match.groupdict(''), token)
replace_match = match.group(0)
sentence = regex.sub(regex.escape(replace_match),
tag, sentence)
else:
sentence = regex.sub(pattern, token, sentence)
return sentence
def _pseudonymization(self, sentence):
sentence_after_regex = sentence
to_replace = []
for category in self._category_anonymisation:
pattern, _, generator, args = self._category_anonymisation[category]
for match in pattern.finditer(sentence_after_regex):
if not match:
continue
to_replace.append((match, generator, args))
sentence_after_regex = regex.sub(regex.escape(match.group(0)),
'', sentence_after_regex)
for match, generator, args in to_replace:
replace_match = match.group(0)
pseudo_string = generator(match.groupdict(''), **args)
sentence = regex.sub(
regex.escape(replace_match),
pseudo_string,
sentence
)
return sentence
def _process_ann(self, ann_subtree):
value = int(ann_subtree.text)
chan = ann_subtree.attrib["chan"]
return chan, value
def _process_single_tok(self, id, tok_subtree):
text = ''
tag = ''
ann = []
for elem in tok_subtree:
if elem.tag == 'orth':
text = elem.text
elif elem.tag == 'lex':
tag = self._process_lex(elem)
elif elem.tag == 'ann':
ann.append(self._process_ann(elem))
word = self._process_word(id, text, tag, ann)
return word
def _process_word(self, id, text, tag, ann):
for chan, value in ann:
if value != 0:
text = self._handle_annotated(id, text, tag, chan)
break
return text
def _process_sent_tree(self, sentence_subtree):
string_builder = []
id = 0
for elem in sentence_subtree:
if elem.tag == 'tok':
tok = self._process_single_tok(id, elem)
string_builder.append(tok)
string_builder.append(' ')
id += 2
elif elem.tag == 'ns':
id -= 1
string_builder.pop()
else:
raise Exception('Unrecognized tag inside sentence: ' + elem.tag)
return self._process_sentence(string_builder)
@abstractmethod
def _handle_annotated(self, id, text, tag, ann):
pass
@abstractmethod
def _process_sentence(self, string_builder):
pass
@abstractmethod
def process(self, input_filename, output_filename):
"""Anonymize the text in a file input_filename and save the anonymized \
output text to a file output_filename.
Args:
input_filename ([type]): [description]
output_filename ([type]): [description]
"""
pass