Skip to content
Snippets Groups Projects

Develop

Merged Ghost User requested to merge develop into master
Compare and
13 files
+ 8928
2
Compare changes
  • Side-by-side
  • Inline
Files
13
src/anonymizer.py 0 → 100644
+ 413
0
"""Implementation of anonymizer functionality."""
import re
from string import punctuation, ascii_lowercase, ascii_uppercase, digits
import random
class Anonymizer:
"""Class used to edit sentences based on options."""
_file_to_liner_dispatch = {
'nam_liv_person': 'person_first_nam',
'nam_liv_person_last': 'person_last_nam',
'nam_fac_road': 'road_nam',
'nam_loc_gpe_city': 'city_nam',
'nam_org_group_team': 'country_nam'
}
_liner_to_tag_dispatch = {
'person_first_nam': '[OSOBA]',
'person_last_nam': '[OSOBA]',
'road_nam': '[MIEJSCE]',
'city_nam': '[MIEJSCE]',
'country_nam': '[MIEJSCE]'
}
def __init__(self, task_options):
"""Initialize anonymizer with task_options."""
self.unmarshallers = {
'chunk': lambda *args: '\n',
'sentence': lambda *args: self._process_sent_tree(*args),
}
self._method = task_options.get('method', 'delete')
self._mail_token = '[MAIL]'
self._user_token = '@[USER]'
self._website_token = '[WWW]'
self._default_token = '[INNE]'
self._form_dict = dict()
self._pseudo_ann_list = list()
self._load_file()
def _load_file(self, file_name='wiki.txt'):
with open(file_name, 'r', encoding='utf-8') as f:
for line in f.readlines():
l_list = line.split()
cat = l_list[0]
if cat in self._file_to_liner_dispatch:
cat_name = self._file_to_liner_dispatch[cat]
length = int((len(l_list) - 2) / 2)
gen_name = ' '.join(l_list[(1 + length):(1 + 2 * length)])
flx_name = ' '.join(l_list[1:(1 + length)])
flex = l_list[-1]
if cat_name not in self._form_dict:
self._form_dict[cat_name] = dict()
if length not in self._form_dict[cat_name]:
self._form_dict[cat_name][length] = dict()
if gen_name not in self._form_dict[cat_name][length]:
self._form_dict[cat_name][length][gen_name] = dict()
self._form_dict[cat_name][length][gen_name][flex] = flx_name
for cat in self._form_dict:
for length in self._form_dict[cat]:
self._form_dict[cat][length] = list(
self._form_dict[cat][length].items()
)
def _process_sent_tree(self, sentence_subtree):
string_builder = []
id = 0
for elem in sentence_subtree:
if elem.tag == 'tok':
tok = self._process_single_tok(id, elem)
string_builder.append(tok)
string_builder.append(' ')
id += 2
elif elem.tag == 'ns':
id -= 1
string_builder.pop()
else:
raise Exception('Unrecognized tag inside sentence: ' + elem.tag)
return self._process_sentence(string_builder)
def _process_sentence(self, string_builder):
string_builder = self._handle_pseudo_ann(string_builder)
return self._anonoymize_phone_number(
self._anonoymize_website(
self._anonoymize_user(
self._anonoymize_email(
''.join(string_builder)
)
)
)
)
def _process_word(self, id, text, tag, ann):
for annotation in ann:
if annotation[1] != 0:
text = self._handle_annotated(id, text, tag, annotation[0])
break
return text
def _handle_annotated(self, id, text, tag, ann):
if self._method == 'delete':
return ''
elif self._method == 'tag':
if ann in self._liner_to_tag_dispatch:
return self._liner_to_tag_dispatch[ann]
elif self._method == 'pseudo':
if ann in self._form_dict:
self._pseudo_ann_list.append((id, text, tag, ann))
return text
def _handle_pseudo_ann(self, string_builder):
if self._pseudo_ann_list:
it = iter(self._pseudo_ann_list)
id, text, tag, ann = next(it)
current_tag = tag
current_ann = ann
current_id = id
length = 1
for id, text, tag, ann in it:
if current_ann == ann and (ann != 'person_first_nam' and
ann != 'person_last_nam'):
if id == current_id + 2:
length += 1
current_tag = tag
current_id = id
continue
new_text = self._get_pseudo_ann(
ann=current_ann,
tag=current_tag,
length=length
)
for t in new_text.split(' '):
string_builder[current_id - 2 * (length - 1)] = t
length -= 1
length = 1
current_tag = tag
current_ann = ann
current_id = id
new_text = self._get_pseudo_ann(current_ann, current_tag, length)
toks = new_text.split(' ')
for i in range(length):
if i < len(toks):
string_builder[current_id - 2 * (length - 1)] = toks[i]
else:
string_builder[current_id - 2 * (length - 1)] = ''
if string_builder[current_id - 2 * (length - 1) + 1] == ' ':
string_builder[current_id - 2 * (length - 1) + 1] = ''
length -= 1
self._pseudo_ann_list.clear()
return string_builder
def _get_pseudo_ann(self, ann, tag, length):
while length not in self._form_dict[ann] and length > 0:
length -= 1
if length == 0:
return ''
new_tag = ':'.join(tag.split(':')[1:4])
for i in range(0, 10):
random_entry = random.choice(self._form_dict[ann][length])
if new_tag in random_entry[1]:
return random_entry[1][new_tag]
if new_tag == 'ign':
return random_entry[0]
random_entry = random.choice(self._form_dict[ann][length])
return random_entry[0]
def _process_single_tok(self, id, tok_subtree):
text = ''
tag = ''
ann = []
for elem in tok_subtree:
if elem.tag == 'orth':
text = elem.text
elif elem.tag == 'lex':
tag = self._process_lex(elem)
elif elem.tag == 'ann':
ann.append(self._process_ann(elem))
word = self._process_word(id, text, tag, ann)
return word
def _process_lex(self, lex_subtree):
tag = ''
for elem in lex_subtree:
if elem.tag == 'ctag':
tag = elem.text
elif elem.tag != 'base':
raise Exception('Unrecognized tag inside lex: ' + elem.tag)
if tag == '':
raise Exception('Lex tag had no ctag inside!')
return tag
def _process_ann(self, ann_subtree):
value = int(ann_subtree.text)
chan = ann_subtree.attrib["chan"]
return chan, value
@staticmethod
def _get_random_character(digit=False, upper=False):
return random.choice(digits) if digit \
else random.choice(ascii_uppercase) \
if upper else random.choice(ascii_lowercase)
@staticmethod
def _generate_pseudo_email(email):
new_mail = []
it = iter(email)
top_domain_len = email.rfind('.')
i = 0
for char in it:
if char == '@':
new_mail.append(char)
i += 1
break
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
if char == '.':
if i == top_domain_len:
new_mail.append(char)
break
new_mail.append(char)
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
new_mail.append(char)
return r''.join(new_mail)
@staticmethod
def _generate_pseudo_user(user):
it = iter(user)
new_user = []
for char in it:
if char in punctuation:
new_user.append(char)
else:
new_user.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_user)
@staticmethod
def _generate_pseudo_website(link):
it = iter(link)
new_link = []
if link[0:4].lower() == 'http':
slashes = 0
for char in it:
if char == '/':
slashes += 1
new_link.append(char)
if slashes == 2:
break
for char in it:
if char == '/':
new_link.append(char)
break
else:
new_link.append(char)
for char in it:
if char in punctuation:
new_link.append(char)
else:
new_link.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_link)
@staticmethod
def _generate_pseudo_phone_number(number):
new_number = []
length = len(number)
it = iter(number)
if number[0] == '+':
how_many = length - 9
for j in range(0, how_many):
new_number.append(next(it))
elif number[0] == '0' and number[1] == '0' \
and number[length - 10] == ' ':
for j in range(0, length - 10):
new_number.append(next(it))
elif number[0] == '(' and number[1] == '0' and number[4] == ')':
for j in range(0, 2):
new_number.append(next(it))
for char in it:
if char.isdigit():
new_number.append(Anonymizer._get_random_character(digit=True))
else:
new_number.append(char)
return r''.join(new_number)
def _generate_phone_number_tag(self, number):
new_number = number.split(' ')
for i in range(len(new_number)):
new_number[i] = self._default_token
return r' '.join(new_number)
def _anonoymize_email(self, sentence):
"""Handles removal/changing of emails addresses."""
email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}'
if self._method == 'delete':
sentence = re.sub(email_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(email_regex, self._mail_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(email_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_email(match),
sentence
)
return sentence
def _anonoymize_user(self, sentence):
"""Handles removal/change of users."""
user_regex = r'\B\@([\w\-]+)'
if self._method == 'delete':
sentence = re.sub(user_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(user_regex, self._user_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(user_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_user(match),
sentence
)
return sentence
def _anonoymize_website(self, sentence):
"""Handles removal/change of links."""
link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \
r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \
r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \
r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \
r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \
r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \
r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \
r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})'
if self._method == 'delete':
sentence = re.sub(link_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(link_regex, self._website_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(link_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_website(match),
sentence
)
return sentence
def _anonoymize_phone_number(self, sentence):
"""Handles removal/change of links."""
phone_number_regex = r'(((\+[1-9]\d{0,2}|00[1-9]\d{0,2}) ?)?(\d{9}))' \
r'|((\+[1-9]\d{0,2} |00[1-9]\d{0,2} )?' \
r'(\d{3} \d{3} \d{3}))|(\(0\d{2}\) \d{2} \d{2} ' \
r'\d{3})|(\(\d{2}\) \d{2} \d{3} \d{2})'
if self._method == 'delete':
sentence = re.sub(phone_number_regex, '', sentence)
elif self._method == 'tag':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_phone_number_tag(match),
sentence
)
elif self._method == 'pseudo':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_phone_number(match),
sentence
)
return sentence