Skip to content
Snippets Groups Projects
anonymizer.py 7.45 KiB
Newer Older
Bartłomiej Koptyra's avatar
Bartłomiej Koptyra committed
"""Implementation of anonymizer functionality."""
import re
from string import punctuation, ascii_lowercase, ascii_uppercase
import random
Bartłomiej Koptyra's avatar
Bartłomiej Koptyra committed


class Anonymizer:
    """Class used to edit sentences based on options."""

    def __init__(self, task_options):
        self.unmarshallers = {
            'chunk': lambda *args: '\n\n',
            'sentence': lambda *args: self._process_sentence(*args),
        }

        self._method = task_options.get('method', 'delete')
        self._mail_token = '[MAIL]'
        self._user_token = '[USER]'
        self._website_token = '[WWW]'
        self._default_token = '[INNE]'
        self._user_req = (False, False)
        self._add_tok = True

    def _process_sentence(self, sentence_subtree):
        string_builder = []
        self._sentence_builder = []
        self._user_req = (False, False)
        for elem in sentence_subtree:
            if elem.tag == 'tok':
                tok = self._process_tok(elem)
                if self._add_tok:
                    string_builder.append(tok)
                self._add_tok = True
            elif elem.tag == 'ns':
                if self._user_req[0]:
                    self._user_req = (True, True)
                elif self._user_req[1]:
                    self._user_req = (False, False)
                else:
                    self._user_req = (False, True)
                self._sentence_builder.append(string_builder)
                string_builder = []
            else:
                raise Exception('Unrecognized tag inside sentence: ' + elem.tag)
        string_builder.append('')
        self._sentence_builder.append(string_builder)
        new_list = []
        for l in self._sentence_builder:
            new_list.append(' '.join(l))
        return ''.join(new_list)

    def _process_word(self, text, tag, ann):
        for annotation in ann:
            if annotation[1] != 0:
                # text = self._handle_annotated(annotation[0], tag)
                break
        text = self._anonoymize_email(text)
        text = self._anonoymize_user(text)
        text = self._anonoymize_website(text)
        return text
    def _process_tok(self, tok_subtree):
        text = ''
        tag = ''
        ann = []
        for elem in tok_subtree:
            if elem.tag == 'orth':
                text = elem.text
            elif elem.tag == 'lex':
                tag = self._process_lex(elem)
            elif elem.tag == 'ann':
                ann.append(self._process_ann(elem))
        print(text, self._user_req[0], self._user_req[1])
        word = self._process_word(text, tag, ann)
        if text == '@' and not self._user_req[0] and not self._user_req[1]:
            self._user_req = (True, False)
        else:
            self._user_req = (False, False)
        return word


    def _process_lex(self, lex_subtree):
        tag = ''
        for elem in lex_subtree:
            if elem.tag == 'ctag':
                tag = elem.text
            elif elem.tag != 'base':
                raise Exception('Unrecognized tag inside lex: ' + elem.tag)
        if tag == '':
            raise Exception('Lex tag had no ctag inside!')
        return tag

    def _process_ann(self, ann_subtree):
        value = int(ann_subtree.text)
        chan = ann_subtree.attrib["chan"]
        return chan, value

    @staticmethod
    def _get_random_chatacter(upper=False):
        return random.choice(ascii_uppercase) \
            if upper else random.choice(ascii_lowercase)

    @staticmethod
    def _generate_pseudo_email(email):
        new_mail = []
        it = iter(email)
        top_domain_len = email.rfind('.')
        i = 0
        for char in it:
            if char == '@':
                new_mail.append(char)
                break
            elif char in punctuation:
                new_mail.append(char)
            else:
                new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
            i += 1
        for char in it:
            if char == '.':
                if i == top_domain_len:
                    new_mail.append(char)
                    break
                new_mail.append(char)
            elif char in punctuation:
                new_mail.append(char)
            else:
                new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
            i += 1
        for char in it:
            new_mail.append(char)
        return ''.join(new_mail)

    @staticmethod
    def _generate_pseudo_user(user):
        it = iter(user)
        new_user = []
        new_user.append(next(it))
        for char in it:
            if char in punctuation:
                new_user.append(char)
            else:
                new_user.append(Anonymizer._get_random_chatacter(char.isupper()))
        return ''.join(new_user)

    @staticmethod
    def _generate_pseudo_website(link):
        it = iter(link)
        new_link = []
        for char in it:
            if char == '/':
                new_link.append(char)
                break
            else:
                new_link.append(char)
        for char in it:
            if char in punctuation:
                new_link.append(char)
            else:
                new_link.append(Anonymizer._get_random_chatacter(char.isupper()))
        return ''.join(new_link)

    def _anonoymize_email(self, token):
        """Handles removal/changing of emails addresses."""
        email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}'
        if self._method == 'delete':
            if re.match(email_regex, token):
                token = ''
                self._add_tok = False
        elif self._method == 'tag':
            token = re.sub(email_regex, self._mail_token, token)
        elif self._method == 'pseudo':
            if re.match(email_regex, token):
                token = self._generate_pseudo_email(token)
        return token

    def _anonoymize_user(self, token):
        """Handles removal/change of users."""
        if self._user_req[0] and self._user_req[1]:
            if self._method == 'delete':
                if self._sentence_builder[-1].pop() != '@':
                    raise Exception('Error while detecting User tag.')
                token = ''
                self._add_tok = False
            elif self._method == 'tag':
                token = self._user_token
            elif self._method == 'pseudo':
                token = self._generate_pseudo_user(token)
        return token

    def _anonoymize_website(self, token):
        """Handles removal/change of links."""
        link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \
                     r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \
                     r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \
                     r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \
                     r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \
                     r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \
                     r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \
                     r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})'
        if self._method == 'delete':
            if re.search(link_regex, token):
                token = ''
                self._add_tok = False
        elif self._method == 'tag':
            token = re.sub(link_regex, self._website_token, token)
        elif self._method == 'pseudo':
            if re.search(link_regex, token):
                token = self._generate_pseudo_website(token)