diff --git a/README.md b/README.md index c22ff70f9dc75a0d9d513b905e1ab7ec685c8b22..2b376e84d88700e5efa982db0673ce2e37d8c2fc 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,4 @@ -# anonymizer +# Anonymizer +Input tagger should be morphoDita and liner2 should use model 5nam. +tekst->any2txt->morphodita->liner2->anonimizer diff --git a/src/anonymizer.py b/src/anonymizer.py index 06c80dc99f4b3183d4cf89f743502bd2c141569c..6efd34b231fbb95c69e499d94958f36c6410cdc9 100644 --- a/src/anonymizer.py +++ b/src/anonymizer.py @@ -8,48 +8,131 @@ class Anonymizer: """Class used to edit sentences based on options.""" def __init__(self, task_options): + self.unmarshallers = { + 'chunk': lambda *args: '\n\n', + 'sentence': lambda *args: self._process_sentence(*args), + } + self._method = task_options.get('method', 'delete') self._mail_token = '[MAIL]' - self._user_token = '@[USER]' + self._user_token = '[USER]' + self._website_token = '[WWW]' + self._default_token = '[INNE]' + self._user_req = (False, False) + self._add_tok = True + + def _process_sentence(self, sentence_subtree): + string_builder = [] + self._sentence_builder = [] + self._user_req = (False, False) + for elem in sentence_subtree: + if elem.tag == 'tok': + tok = self._process_tok(elem) + if self._add_tok: + string_builder.append(tok) + self._add_tok = True + elif elem.tag == 'ns': + if self._user_req[0]: + self._user_req = (True, True) + elif self._user_req[1]: + self._user_req = (False, False) + else: + self._user_req = (False, True) + self._sentence_builder.append(string_builder) + string_builder = [] + else: + raise Exception('Unrecognized tag inside sentence: ' + elem.tag) + string_builder.append('') + self._sentence_builder.append(string_builder) + new_list = [] + for l in self._sentence_builder: + new_list.append(' '.join(l)) + return ''.join(new_list) + + def _process_word(self, text, tag, ann): + for annotation in ann: + if annotation[1] != 0: + # text = self._handle_annotated(annotation[0], tag) + break + text = self._anonoymize_email(text) + text = self._anonoymize_user(text) + text = self._anonoymize_website(text) + return text - def process(self): - if ctag == 'ign': - # sprawddz czy to nick a potem email - # sprawdz czy to nazwa własna jak mBank? nie wiem - print() + def _process_tok(self, tok_subtree): + text = '' + tag = '' + ann = [] + for elem in tok_subtree: + if elem.tag == 'orth': + text = elem.text + elif elem.tag == 'lex': + tag = self._process_lex(elem) + elif elem.tag == 'ann': + ann.append(self._process_ann(elem)) + print(text, self._user_req[0], self._user_req[1]) + word = self._process_word(text, tag, ann) + if text == '@' and not self._user_req[0] and not self._user_req[1]: + self._user_req = (True, False) + else: + self._user_req = (False, False) + return word + + + def _process_lex(self, lex_subtree): + tag = '' + for elem in lex_subtree: + if elem.tag == 'ctag': + tag = elem.text + elif elem.tag != 'base': + raise Exception('Unrecognized tag inside lex: ' + elem.tag) + if tag == '': + raise Exception('Lex tag had no ctag inside!') + return tag + + def _process_ann(self, ann_subtree): + value = int(ann_subtree.text) + chan = ann_subtree.attrib["chan"] + return chan, value @staticmethod def _get_random_chatacter(upper=False): return random.choice(ascii_uppercase) \ if upper else random.choice(ascii_lowercase) - def _generate_pseudo_email(self, email): + @staticmethod + def _generate_pseudo_email(email): new_mail = [] it = iter(email) - top_domain_len = len(email) - email.rfind('.') + top_domain_len = email.rfind('.') + i = 0 for char in it: if char == '@': new_mail.append(char) + i += 1 break elif char in punctuation: new_mail.append(char) else: - new_mail.append(self._get_random_chatacter(char.isupper())) + new_mail.append(Anonymizer._get_random_chatacter(char.isupper())) + i += 1 for char in it: if char == '.': - if len(list(it)) == top_domain_len: + if i == top_domain_len: new_mail.append(char) break new_mail.append(char) elif char in punctuation: new_mail.append(char) else: - new_mail.append(self._get_random_chatacter(char.isupper())) + new_mail.append(Anonymizer._get_random_chatacter(char.isupper())) + i += 1 for char in it: new_mail.append(char) return ''.join(new_mail) - def _generate_pseudo_user(self, user): + @staticmethod + def _generate_pseudo_user(user): it = iter(user) new_user = [] new_user.append(next(it)) @@ -57,28 +140,72 @@ class Anonymizer: if char in punctuation: new_user.append(char) else: - new_user.append(self._get_random_chatacter(char.isupper())) + new_user.append(Anonymizer._get_random_chatacter(char.isupper())) return ''.join(new_user) + @staticmethod + def _generate_pseudo_website(link): + it = iter(link) + new_link = [] + for char in it: + if char == '/': + new_link.append(char) + break + else: + new_link.append(char) + for char in it: + if char in punctuation: + new_link.append(char) + else: + new_link.append(Anonymizer._get_random_chatacter(char.isupper())) + return ''.join(new_link) + def _anonoymize_email(self, token): """Handles removal/changing of emails addresses.""" email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}' if self._method == 'delete': - token = re.sub(email_regex, r'', token) + if re.match(email_regex, token): + token = '' + self._add_tok = False elif self._method == 'tag': token = re.sub(email_regex, self._mail_token, token) elif self._method == 'pseudo': - token = self._generate_pseudo_email(token) + if re.match(email_regex, token): + token = self._generate_pseudo_email(token) return token def _anonoymize_user(self, token): """Handles removal/change of users.""" - mention_regex = r'\B\@([\w\-]+)' + if self._user_req[0] and self._user_req[1]: + if self._method == 'delete': + if self._sentence_builder[-1].pop() != '@': + raise Exception('Error while detecting User tag.') + token = '' + self._add_tok = False + elif self._method == 'tag': + token = self._user_token + elif self._method == 'pseudo': + token = self._generate_pseudo_user(token) + return token + + def _anonoymize_website(self, token): + """Handles removal/change of links.""" + link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \ + r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \ + r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \ + r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \ + r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \ + r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \ + r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \ + r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})' if self._method == 'delete': - token = re.sub(mention_regex, r'', token) + if re.search(link_regex, token): + token = '' + self._add_tok = False elif self._method == 'tag': - token = re.sub(mention_regex, self._user_token, token) + token = re.sub(link_regex, self._website_token, token) elif self._method == 'pseudo': - token = self._generate_pseudo_user(token) + if re.search(link_regex, token): + token = self._generate_pseudo_website(token) return token diff --git a/src/ccl_handler.py b/src/ccl_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..10195b146f3d4c2046e37005719888ca24dc4e7e --- /dev/null +++ b/src/ccl_handler.py @@ -0,0 +1,19 @@ +"""Implementation of ccl reading functionality.""" +from xml.etree.ElementTree import iterparse + +class Ccl_handler: + """Implements reading ccl for anonymizer service.""" + + def __init__(self, ccl_file_name): + self._file_name = ccl_file_name + + def process(self, output_file, unmarshallers): + with open(output_file, 'wt') as out: + with open(self._file_name, 'r') as f: + for event, elem in iterparse(f): + unmarshal = unmarshallers.get(elem.tag, None) + if unmarshal: + out.write(unmarshal(elem)) + elem.clear() + +