Commit f78b4599 authored by Bartłomiej Koptyra's avatar Bartłomiej Koptyra

Handling MAIL,USER,WWW done.

parent 773f8011
# anonymizer
# Anonymizer
Input tagger should be morphoDita and liner2 should use model 5nam.
tekst->any2txt->morphodita->liner2->anonimizer
......@@ -8,48 +8,131 @@ class Anonymizer:
"""Class used to edit sentences based on options."""
def __init__(self, task_options):
self.unmarshallers = {
'chunk': lambda *args: '\n\n',
'sentence': lambda *args: self._process_sentence(*args),
}
self._method = task_options.get('method', 'delete')
self._mail_token = '[MAIL]'
self._user_token = '@[USER]'
self._user_token = '[USER]'
self._website_token = '[WWW]'
self._default_token = '[INNE]'
self._user_req = (False, False)
self._add_tok = True
def _process_sentence(self, sentence_subtree):
string_builder = []
self._sentence_builder = []
self._user_req = (False, False)
for elem in sentence_subtree:
if elem.tag == 'tok':
tok = self._process_tok(elem)
if self._add_tok:
string_builder.append(tok)
self._add_tok = True
elif elem.tag == 'ns':
if self._user_req[0]:
self._user_req = (True, True)
elif self._user_req[1]:
self._user_req = (False, False)
else:
self._user_req = (False, True)
self._sentence_builder.append(string_builder)
string_builder = []
else:
raise Exception('Unrecognized tag inside sentence: ' + elem.tag)
string_builder.append('')
self._sentence_builder.append(string_builder)
new_list = []
for l in self._sentence_builder:
new_list.append(' '.join(l))
return ''.join(new_list)
def _process_word(self, text, tag, ann):
for annotation in ann:
if annotation[1] != 0:
# text = self._handle_annotated(annotation[0], tag)
break
text = self._anonoymize_email(text)
text = self._anonoymize_user(text)
text = self._anonoymize_website(text)
return text
def process(self):
if ctag == 'ign':
# sprawddz czy to nick a potem email
# sprawdz czy to nazwa własna jak mBank? nie wiem
print()
def _process_tok(self, tok_subtree):
text = ''
tag = ''
ann = []
for elem in tok_subtree:
if elem.tag == 'orth':
text = elem.text
elif elem.tag == 'lex':
tag = self._process_lex(elem)
elif elem.tag == 'ann':
ann.append(self._process_ann(elem))
print(text, self._user_req[0], self._user_req[1])
word = self._process_word(text, tag, ann)
if text == '@' and not self._user_req[0] and not self._user_req[1]:
self._user_req = (True, False)
else:
self._user_req = (False, False)
return word
def _process_lex(self, lex_subtree):
tag = ''
for elem in lex_subtree:
if elem.tag == 'ctag':
tag = elem.text
elif elem.tag != 'base':
raise Exception('Unrecognized tag inside lex: ' + elem.tag)
if tag == '':
raise Exception('Lex tag had no ctag inside!')
return tag
def _process_ann(self, ann_subtree):
value = int(ann_subtree.text)
chan = ann_subtree.attrib["chan"]
return chan, value
@staticmethod
def _get_random_chatacter(upper=False):
return random.choice(ascii_uppercase) \
if upper else random.choice(ascii_lowercase)
def _generate_pseudo_email(self, email):
@staticmethod
def _generate_pseudo_email(email):
new_mail = []
it = iter(email)
top_domain_len = len(email) - email.rfind('.')
top_domain_len = email.rfind('.')
i = 0
for char in it:
if char == '@':
new_mail.append(char)
i += 1
break
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(self._get_random_chatacter(char.isupper()))
new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
i += 1
for char in it:
if char == '.':
if len(list(it)) == top_domain_len:
if i == top_domain_len:
new_mail.append(char)
break
new_mail.append(char)
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(self._get_random_chatacter(char.isupper()))
new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
i += 1
for char in it:
new_mail.append(char)
return ''.join(new_mail)
def _generate_pseudo_user(self, user):
@staticmethod
def _generate_pseudo_user(user):
it = iter(user)
new_user = []
new_user.append(next(it))
......@@ -57,28 +140,72 @@ class Anonymizer:
if char in punctuation:
new_user.append(char)
else:
new_user.append(self._get_random_chatacter(char.isupper()))
new_user.append(Anonymizer._get_random_chatacter(char.isupper()))
return ''.join(new_user)
@staticmethod
def _generate_pseudo_website(link):
it = iter(link)
new_link = []
for char in it:
if char == '/':
new_link.append(char)
break
else:
new_link.append(char)
for char in it:
if char in punctuation:
new_link.append(char)
else:
new_link.append(Anonymizer._get_random_chatacter(char.isupper()))
return ''.join(new_link)
def _anonoymize_email(self, token):
"""Handles removal/changing of emails addresses."""
email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}'
if self._method == 'delete':
token = re.sub(email_regex, r'', token)
if re.match(email_regex, token):
token = ''
self._add_tok = False
elif self._method == 'tag':
token = re.sub(email_regex, self._mail_token, token)
elif self._method == 'pseudo':
token = self._generate_pseudo_email(token)
if re.match(email_regex, token):
token = self._generate_pseudo_email(token)
return token
def _anonoymize_user(self, token):
"""Handles removal/change of users."""
mention_regex = r'\B\@([\w\-]+)'
if self._user_req[0] and self._user_req[1]:
if self._method == 'delete':
if self._sentence_builder[-1].pop() != '@':
raise Exception('Error while detecting User tag.')
token = ''
self._add_tok = False
elif self._method == 'tag':
token = self._user_token
elif self._method == 'pseudo':
token = self._generate_pseudo_user(token)
return token
def _anonoymize_website(self, token):
"""Handles removal/change of links."""
link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \
r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \
r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \
r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \
r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \
r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \
r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \
r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})'
if self._method == 'delete':
token = re.sub(mention_regex, r'', token)
if re.search(link_regex, token):
token = ''
self._add_tok = False
elif self._method == 'tag':
token = re.sub(mention_regex, self._user_token, token)
token = re.sub(link_regex, self._website_token, token)
elif self._method == 'pseudo':
token = self._generate_pseudo_user(token)
if re.search(link_regex, token):
token = self._generate_pseudo_website(token)
return token
"""Implementation of ccl reading functionality."""
from xml.etree.ElementTree import iterparse
class Ccl_handler:
"""Implements reading ccl for anonymizer service."""
def __init__(self, ccl_file_name):
self._file_name = ccl_file_name
def process(self, output_file, unmarshallers):
with open(output_file, 'wt') as out:
with open(self._file_name, 'r') as f:
for event, elem in iterparse(f):
unmarshal = unmarshallers.get(elem.tag, None)
if unmarshal:
out.write(unmarshal(elem))
elem.clear()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment