"""Implementation of anonymizer functionality.""" import re from string import punctuation, ascii_lowercase, ascii_uppercase import random class Anonymizer: """Class used to edit sentences based on options.""" def __init__(self, task_options): self.unmarshallers = { 'chunk': lambda *args: '\n\n', 'sentence': lambda *args: self._process_sentence(*args), } self._method = task_options.get('method', 'delete') self._mail_token = '[MAIL]' self._user_token = '[USER]' self._website_token = '[WWW]' self._default_token = '[INNE]' self._user_req = (False, False) self._add_tok = True def _process_sentence(self, sentence_subtree): string_builder = [] self._sentence_builder = [] self._user_req = (False, False) for elem in sentence_subtree: if elem.tag == 'tok': tok = self._process_tok(elem) if self._add_tok: string_builder.append(tok) self._add_tok = True elif elem.tag == 'ns': if self._user_req[0]: self._user_req = (True, True) elif self._user_req[1]: self._user_req = (False, False) else: self._user_req = (False, True) self._sentence_builder.append(string_builder) string_builder = [] else: raise Exception('Unrecognized tag inside sentence: ' + elem.tag) string_builder.append('') self._sentence_builder.append(string_builder) new_list = [] for l in self._sentence_builder: new_list.append(' '.join(l)) return ''.join(new_list) def _process_word(self, text, tag, ann): for annotation in ann: if annotation[1] != 0: # text = self._handle_annotated(annotation[0], tag) break text = self._anonoymize_email(text) text = self._anonoymize_user(text) text = self._anonoymize_website(text) return text def _process_tok(self, tok_subtree): text = '' tag = '' ann = [] for elem in tok_subtree: if elem.tag == 'orth': text = elem.text elif elem.tag == 'lex': tag = self._process_lex(elem) elif elem.tag == 'ann': ann.append(self._process_ann(elem)) print(text, self._user_req[0], self._user_req[1]) word = self._process_word(text, tag, ann) if text == '@' and not self._user_req[0] and not self._user_req[1]: self._user_req = (True, False) else: self._user_req = (False, False) return word def _process_lex(self, lex_subtree): tag = '' for elem in lex_subtree: if elem.tag == 'ctag': tag = elem.text elif elem.tag != 'base': raise Exception('Unrecognized tag inside lex: ' + elem.tag) if tag == '': raise Exception('Lex tag had no ctag inside!') return tag def _process_ann(self, ann_subtree): value = int(ann_subtree.text) chan = ann_subtree.attrib["chan"] return chan, value @staticmethod def _get_random_chatacter(upper=False): return random.choice(ascii_uppercase) \ if upper else random.choice(ascii_lowercase) @staticmethod def _generate_pseudo_email(email): new_mail = [] it = iter(email) top_domain_len = email.rfind('.') i = 0 for char in it: if char == '@': new_mail.append(char) i += 1 break elif char in punctuation: new_mail.append(char) else: new_mail.append(Anonymizer._get_random_chatacter(char.isupper())) i += 1 for char in it: if char == '.': if i == top_domain_len: new_mail.append(char) break new_mail.append(char) elif char in punctuation: new_mail.append(char) else: new_mail.append(Anonymizer._get_random_chatacter(char.isupper())) i += 1 for char in it: new_mail.append(char) return ''.join(new_mail) @staticmethod def _generate_pseudo_user(user): it = iter(user) new_user = [] new_user.append(next(it)) for char in it: if char in punctuation: new_user.append(char) else: new_user.append(Anonymizer._get_random_chatacter(char.isupper())) return ''.join(new_user) @staticmethod def _generate_pseudo_website(link): it = iter(link) new_link = [] for char in it: if char == '/': new_link.append(char) break else: new_link.append(char) for char in it: if char in punctuation: new_link.append(char) else: new_link.append(Anonymizer._get_random_chatacter(char.isupper())) return ''.join(new_link) def _anonoymize_email(self, token): """Handles removal/changing of emails addresses.""" email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}' if self._method == 'delete': if re.match(email_regex, token): token = '' self._add_tok = False elif self._method == 'tag': token = re.sub(email_regex, self._mail_token, token) elif self._method == 'pseudo': if re.match(email_regex, token): token = self._generate_pseudo_email(token) return token def _anonoymize_user(self, token): """Handles removal/change of users.""" if self._user_req[0] and self._user_req[1]: if self._method == 'delete': if self._sentence_builder[-1].pop() != '@': raise Exception('Error while detecting User tag.') token = '' self._add_tok = False elif self._method == 'tag': token = self._user_token elif self._method == 'pseudo': token = self._generate_pseudo_user(token) return token def _anonoymize_website(self, token): """Handles removal/change of links.""" link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \ r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \ r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \ r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \ r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \ r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \ r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \ r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})' if self._method == 'delete': if re.search(link_regex, token): token = '' self._add_tok = False elif self._method == 'tag': token = re.sub(link_regex, self._website_token, token) elif self._method == 'pseudo': if re.search(link_regex, token): token = self._generate_pseudo_website(token) return token