Newer
Older
"""Implementation of anonymizer functionality."""
import re
from string import punctuation, ascii_lowercase, ascii_uppercase
import random
class Anonymizer:
"""Class used to edit sentences based on options."""
def __init__(self, task_options):
self.unmarshallers = {
'chunk': lambda *args: '\n\n',
'sentence': lambda *args: self._process_sentence(*args),
}
self._method = task_options.get('method', 'delete')
self._mail_token = '[MAIL]'
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
self._user_token = '[USER]'
self._website_token = '[WWW]'
self._default_token = '[INNE]'
self._user_req = (False, False)
self._add_tok = True
def _process_sentence(self, sentence_subtree):
string_builder = []
self._sentence_builder = []
self._user_req = (False, False)
for elem in sentence_subtree:
if elem.tag == 'tok':
tok = self._process_tok(elem)
if self._add_tok:
string_builder.append(tok)
self._add_tok = True
elif elem.tag == 'ns':
if self._user_req[0]:
self._user_req = (True, True)
elif self._user_req[1]:
self._user_req = (False, False)
else:
self._user_req = (False, True)
self._sentence_builder.append(string_builder)
string_builder = []
else:
raise Exception('Unrecognized tag inside sentence: ' + elem.tag)
string_builder.append('')
self._sentence_builder.append(string_builder)
new_list = []
for l in self._sentence_builder:
new_list.append(' '.join(l))
return ''.join(new_list)
def _process_word(self, text, tag, ann):
for annotation in ann:
if annotation[1] != 0:
# text = self._handle_annotated(annotation[0], tag)
break
text = self._anonoymize_email(text)
text = self._anonoymize_user(text)
text = self._anonoymize_website(text)
return text
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def _process_tok(self, tok_subtree):
text = ''
tag = ''
ann = []
for elem in tok_subtree:
if elem.tag == 'orth':
text = elem.text
elif elem.tag == 'lex':
tag = self._process_lex(elem)
elif elem.tag == 'ann':
ann.append(self._process_ann(elem))
print(text, self._user_req[0], self._user_req[1])
word = self._process_word(text, tag, ann)
if text == '@' and not self._user_req[0] and not self._user_req[1]:
self._user_req = (True, False)
else:
self._user_req = (False, False)
return word
def _process_lex(self, lex_subtree):
tag = ''
for elem in lex_subtree:
if elem.tag == 'ctag':
tag = elem.text
elif elem.tag != 'base':
raise Exception('Unrecognized tag inside lex: ' + elem.tag)
if tag == '':
raise Exception('Lex tag had no ctag inside!')
return tag
def _process_ann(self, ann_subtree):
value = int(ann_subtree.text)
chan = ann_subtree.attrib["chan"]
return chan, value
@staticmethod
def _get_random_chatacter(upper=False):
return random.choice(ascii_uppercase) \
if upper else random.choice(ascii_lowercase)
@staticmethod
def _generate_pseudo_email(email):
new_mail = []
it = iter(email)
top_domain_len = email.rfind('.')
i = 0
for char in it:
if char == '@':
new_mail.append(char)
break
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
i += 1
for char in it:
if char == '.':
new_mail.append(char)
break
new_mail.append(char)
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
i += 1
for char in it:
new_mail.append(char)
return ''.join(new_mail)
@staticmethod
def _generate_pseudo_user(user):
it = iter(user)
new_user = []
new_user.append(next(it))
for char in it:
if char in punctuation:
new_user.append(char)
else:
new_user.append(Anonymizer._get_random_chatacter(char.isupper()))
@staticmethod
def _generate_pseudo_website(link):
it = iter(link)
new_link = []
for char in it:
if char == '/':
new_link.append(char)
break
else:
new_link.append(char)
for char in it:
if char in punctuation:
new_link.append(char)
else:
new_link.append(Anonymizer._get_random_chatacter(char.isupper()))
return ''.join(new_link)
def _anonoymize_email(self, token):
"""Handles removal/changing of emails addresses."""
email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}'
if self._method == 'delete':
if re.match(email_regex, token):
token = ''
self._add_tok = False
elif self._method == 'tag':
token = re.sub(email_regex, self._mail_token, token)
elif self._method == 'pseudo':
if re.match(email_regex, token):
token = self._generate_pseudo_email(token)
return token
def _anonoymize_user(self, token):
"""Handles removal/change of users."""
if self._user_req[0] and self._user_req[1]:
if self._method == 'delete':
if self._sentence_builder[-1].pop() != '@':
raise Exception('Error while detecting User tag.')
token = ''
self._add_tok = False
elif self._method == 'tag':
token = self._user_token
elif self._method == 'pseudo':
token = self._generate_pseudo_user(token)
return token
def _anonoymize_website(self, token):
"""Handles removal/change of links."""
link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \
r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \
r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \
r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \
r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \
r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \
r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \
r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})'
if re.search(link_regex, token):
token = ''
self._add_tok = False
token = re.sub(link_regex, self._website_token, token)
elif self._method == 'pseudo':
if re.search(link_regex, token):
token = self._generate_pseudo_website(token)