Skip to content
Snippets Groups Projects
Commit eb1e9ee7 authored by Bartlomiej Koptyra's avatar Bartlomiej Koptyra
Browse files

First version of annonymizer. Needs an update for the wiki with replacments.

parent f78b4599
No related branches found
No related tags found
2 merge requests!2Develop,!1Develop
Pipeline #1566 passed
image: 'clarinpl/python:3.6'
cache:
paths:
- .tox
stages:
- check_style
- build
before_script:
- pip install tox==2.9.1
pep8:
stage: check_style
script:
- tox -v -e pep8
docstyle:
stage: check_style
script:
- tox -v -e docstyle
build_image:
stage: build
image: 'docker:18.09.7'
only:
- master
services:
- 'docker:18.09.7-dind'
before_script:
- ''
script:
- docker build -t clarinpl/anonymizer .
- echo $DOCKER_PASSWORD > pass.txt
- cat pass.txt | docker login --username $DOCKER_USERNAME --password-stdin
- rm pass.txt
- docker push clarinpl/anonymizer
......@@ -15,3 +15,4 @@ services:
- './config.ini:/home/worker/config.ini'
- './src:/home/worker/src'
- './main.py:/home/worker/main.py'
- './wiktionary-forms-with-bases-and-tags.txt:/home/worker/wiktionary-forms-with-bases-and-tags.txt'
"""Implementation of anonymizer functionality."""
import re
from string import punctuation, ascii_lowercase, ascii_uppercase
from string import punctuation, ascii_lowercase, ascii_uppercase, digits
import random
class Anonymizer:
"""Class used to edit sentences based on options."""
_file_to_liner_dispatch = {
'nam_liv_person': 'person_first_nam',
'xDDDDDDDD': 'person_last_nam',
'nam_fac_road': 'road_nam',
'nam_loc_gpe_city': 'city_nam',
'xDDDDDDDd': 'country_nam'
}
_liner_to_tag_dispatch = {
'person_first_nam': '[OSOBA]',
'person_last_nam': '[OSOBA]',
'road_nam': '[MIEJSCE]',
'city_nam': '[MIEJSCE]',
'country_nam': '[MIEJSCE]'
}
def __init__(self, task_options):
"""Initialize anonymizer with task_options."""
self.unmarshallers = {
'chunk': lambda *args: '\n\n',
'sentence': lambda *args: self._process_sentence(*args),
'sentence': lambda *args: self._process_sent_tree(*args),
}
self._method = task_options.get('method', 'delete')
self._mail_token = '[MAIL]'
self._user_token = '[USER]'
self._user_token = '@[USER]'
self._website_token = '[WWW]'
self._default_token = '[INNE]'
self._user_req = (False, False)
self._add_tok = True
self._form_dict = dict()
self._pseudo_ann_list = list()
self._load_file()
def _process_sentence(self, sentence_subtree):
def _load_file(self, file_name='wiktionary-forms-with-bases-and-tags.txt'):
with open(file_name, 'r', encoding='utf-8') as f:
for line in f.readlines():
l_list = line.split()
cat = l_list[0]
if cat in self._file_to_liner_dispatch:
cat_name = self._file_to_liner_dispatch[cat]
length = int((len(l_list) - 2) / 2)
gen_name = ' '.join(l_list[(1 + length):(1 + 2 * length)])
flx_name = ' '.join(l_list[1:(1 + length)])
flex = l_list[-1]
if cat_name not in self._form_dict:
self._form_dict[cat_name] = dict()
if length not in self._form_dict[cat_name]:
self._form_dict[cat_name][length] = dict()
if gen_name not in self._form_dict[cat_name][length]:
self._form_dict[cat_name][length][gen_name] = dict()
self._form_dict[cat_name][length][gen_name][flex] = flx_name
for cat in self._form_dict:
for length in self._form_dict[cat]:
self._form_dict[cat][length] = list(
self._form_dict[cat][length].items()
)
def _process_sent_tree(self, sentence_subtree):
string_builder = []
self._sentence_builder = []
self._user_req = (False, False)
id = 0
for elem in sentence_subtree:
if elem.tag == 'tok':
tok = self._process_tok(elem)
if self._add_tok:
tok = self._process_single_tok(id, elem)
string_builder.append(tok)
self._add_tok = True
string_builder.append(' ')
id += 2
elif elem.tag == 'ns':
if self._user_req[0]:
self._user_req = (True, True)
elif self._user_req[1]:
self._user_req = (False, False)
else:
self._user_req = (False, True)
self._sentence_builder.append(string_builder)
string_builder = []
id -= 1
string_builder.pop()
else:
raise Exception('Unrecognized tag inside sentence: ' + elem.tag)
string_builder.append('')
self._sentence_builder.append(string_builder)
new_list = []
for l in self._sentence_builder:
new_list.append(' '.join(l))
return ''.join(new_list)
return self._process_sentence(string_builder)
def _process_word(self, text, tag, ann):
def _process_sentence(self, string_builder):
string_builder = self._handle_pseudo_ann(string_builder)
sentence = ''.join(string_builder)
sentence = self._anonoymize_email(sentence)
sentence = self._anonoymize_user(sentence)
sentence = self._anonoymize_website(sentence)
sentence = self._anonoymize_phone_number(sentence)
return sentence
def _process_word(self, id, text, tag, ann):
for annotation in ann:
if annotation[1] != 0:
# text = self._handle_annotated(annotation[0], tag)
text = self._handle_annotated(id, text, tag, annotation[0])
break
text = self._anonoymize_email(text)
text = self._anonoymize_user(text)
text = self._anonoymize_website(text)
return text
def _process_tok(self, tok_subtree):
def _handle_annotated(self, id, text, tag, ann):
if self._method == 'delete':
return ''
elif self._method == 'tag':
if ann in self._liner_to_tag_dispatch:
return self._liner_to_tag_dispatch[ann]
elif self._method == 'pseudo':
if ann in self._form_dict:
self._pseudo_ann_list.append((id, text, tag, ann))
return text
def _handle_pseudo_ann(self, string_builder):
if self._pseudo_ann_list:
it = iter(self._pseudo_ann_list)
id, text, tag, ann = next(it)
current_tag = tag
current_ann = ann
current_id = id
length = 1
for id, text, tag, ann in it:
if current_ann == ann:
if id == current_id + 2:
length += 1
current_tag = tag
current_id = id
continue
new_text = self._get_pseudo_ann(
ann=current_ann,
tag=current_tag,
length=length
)
for t in new_text.split(' '):
string_builder[current_id - 2 * (length - 1)] = t
length -= 1
length = 1
current_tag = tag
current_ann = ann
current_id = id
new_text = self._get_pseudo_ann(current_ann, current_tag, length)
toks = new_text.split(' ')
for i in range(length):
if i < len(toks):
string_builder[current_id - 2 * (length - 1)] = toks[i]
else:
string_builder[current_id - 2 * (length - 1)] = ''
if string_builder[current_id - 2 * (length - 1) + 1] == ' ':
string_builder[current_id - 2 * (length - 1) + 1] = ''
length -= 1
self._pseudo_ann_list.clear()
return string_builder
def _get_pseudo_ann(self, ann, tag, length):
while length not in self._form_dict[ann] and length > 0:
length -= 1
if length == 0:
return ''
new_tag = ':'.join(tag.split(':')[1:3])
for i in range(0, 10):
random_entry = random.choice(self._form_dict[ann][length])
if new_tag in random_entry[1]:
return random_entry[1][new_tag]
if new_tag == 'ign':
return random_entry[0]
random_entry = random.choice(self._form_dict[ann][length])
return random_entry[0]
def _process_single_tok(self, id, tok_subtree):
text = ''
tag = ''
ann = []
......@@ -70,15 +171,9 @@ class Anonymizer:
tag = self._process_lex(elem)
elif elem.tag == 'ann':
ann.append(self._process_ann(elem))
print(text, self._user_req[0], self._user_req[1])
word = self._process_word(text, tag, ann)
if text == '@' and not self._user_req[0] and not self._user_req[1]:
self._user_req = (True, False)
else:
self._user_req = (False, False)
word = self._process_word(id, text, tag, ann)
return word
def _process_lex(self, lex_subtree):
tag = ''
for elem in lex_subtree:
......@@ -96,8 +191,9 @@ class Anonymizer:
return chan, value
@staticmethod
def _get_random_chatacter(upper=False):
return random.choice(ascii_uppercase) \
def _get_random_character(digit=False, upper=False):
return random.choice(digits) if digit \
else random.choice(ascii_uppercase) \
if upper else random.choice(ascii_lowercase)
@staticmethod
......@@ -114,7 +210,12 @@ class Anonymizer:
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
if char == '.':
......@@ -125,28 +226,45 @@ class Anonymizer:
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(Anonymizer._get_random_chatacter(char.isupper()))
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
new_mail.append(char)
return ''.join(new_mail)
return r''.join(new_mail)
@staticmethod
def _generate_pseudo_user(user):
it = iter(user)
new_user = []
new_user.append(next(it))
for char in it:
if char in punctuation:
new_user.append(char)
else:
new_user.append(Anonymizer._get_random_chatacter(char.isupper()))
return ''.join(new_user)
new_user.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_user)
@staticmethod
def _generate_pseudo_website(link):
it = iter(link)
new_link = []
if link[0:4].lower() == 'http':
slashes = 0
for char in it:
if char == '/':
slashes += 1
new_link.append(char)
if slashes == 2:
break
for char in it:
if char == '/':
new_link.append(char)
......@@ -157,38 +275,75 @@ class Anonymizer:
if char in punctuation:
new_link.append(char)
else:
new_link.append(Anonymizer._get_random_chatacter(char.isupper()))
return ''.join(new_link)
new_link.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_link)
@staticmethod
def _generate_pseudo_phone_number(number):
new_number = []
it = iter(number)
if number[0] == '+':
for j in range(0, 3):
new_number.append(next(it))
elif number[0] == '0' and number[1] == '0' and number[4] == ' ':
for j in range(0, 4):
new_number.append(next(it))
elif number[0] == '(' and number[1] == '0' and number[4] == ')':
for j in range(0, 2):
new_number.append(next(it))
for char in it:
if char.isdigit():
new_number.append(Anonymizer._get_random_character(digit=True))
else:
new_number.append(char)
return r''.join(new_number)
def _generate_phone_number_tag(self, number):
new_number = number.split(' ')
for i in range(len(new_number)):
new_number[i] = self._default_token
return r' '.join(new_number)
def _anonoymize_email(self, token):
def _anonoymize_email(self, sentence):
"""Handles removal/changing of emails addresses."""
email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}'
if self._method == 'delete':
if re.match(email_regex, token):
token = ''
self._add_tok = False
sentence = re.sub(email_regex, '', sentence)
elif self._method == 'tag':
token = re.sub(email_regex, self._mail_token, token)
sentence = re.sub(email_regex, self._mail_token, sentence)
elif self._method == 'pseudo':
if re.match(email_regex, token):
token = self._generate_pseudo_email(token)
return token
matches = re.findall(email_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_email(match),
sentence
)
return sentence
def _anonoymize_user(self, token):
def _anonoymize_user(self, sentence):
"""Handles removal/change of users."""
if self._user_req[0] and self._user_req[1]:
user_regex = r'\B\@([\w\-]+)'
if self._method == 'delete':
if self._sentence_builder[-1].pop() != '@':
raise Exception('Error while detecting User tag.')
token = ''
self._add_tok = False
sentence = re.sub(user_regex, '', sentence)
elif self._method == 'tag':
token = self._user_token
sentence = re.sub(user_regex, self._user_token, sentence)
elif self._method == 'pseudo':
token = self._generate_pseudo_user(token)
return token
matches = re.findall(user_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_user(match),
sentence
)
return sentence
def _anonoymize_website(self, token):
def _anonoymize_website(self, sentence):
"""Handles removal/change of links."""
link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \
r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \
......@@ -199,13 +354,52 @@ class Anonymizer:
r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \
r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})'
if self._method == 'delete':
if re.search(link_regex, token):
token = ''
self._add_tok = False
sentence = re.sub(link_regex, '', sentence)
elif self._method == 'tag':
token = re.sub(link_regex, self._website_token, token)
sentence = re.sub(link_regex, self._website_token, sentence)
elif self._method == 'pseudo':
if re.search(link_regex, token):
token = self._generate_pseudo_website(token)
return token
matches = re.findall(link_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_website(match),
sentence
)
return sentence
def _anonoymize_phone_number(self, sentence):
"""Handles removal/change of links."""
phone_number_regex = r'(((\+\d{2}|00\d{2}) ?)?(\d{9}))|((\+\d{2} ' \
r'|00\d{2} )?(\d{3} \d{3} \d{3}))|(\(0\d{2}\) ' \
r'\d{2} \d{2} \d{3})|(\(\d{2}\) \d{2} \d{3} \d{2})'
if self._method == 'delete':
sentence = re.sub(phone_number_regex, '', sentence)
elif self._method == 'tag':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_phone_number_tag(match),
sentence
)
elif self._method == 'pseudo':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_phone_number(match),
sentence
)
return sentence
"""Implementation of ccl reading functionality."""
from xml.etree.ElementTree import iterparse
class Ccl_handler:
"""Implements reading ccl for anonymizer service."""
def __init__(self, ccl_file_name):
"""Initialize ccl_handler with a filename."""
self._file_name = ccl_file_name
def process(self, output_file, unmarshallers):
with open(output_file, 'wt') as out:
with open(self._file_name, 'r') as f:
"""Process xml tags using unmarshallers and save in output_file."""
with open(output_file, 'wt', encoding='utf-8') as out:
with open(self._file_name, 'r', encoding='utf-8') as f:
for event, elem in iterparse(f):
unmarshal = unmarshallers.get(elem.tag, None)
if unmarshal:
out.write(unmarshal(elem))
elem.clear()
......@@ -5,6 +5,7 @@ import nlp_ws
from src.anonymizer import Anonymizer
from src.ccl_handler import Ccl_handler
_log = logging.getLogger(__name__)
......@@ -12,11 +13,6 @@ _log = logging.getLogger(__name__)
class Worker(nlp_ws.NLPWorker):
"""Implements nlp_worker for anonymizer service."""
@classmethod
def static_init(cls, config):
"""One time static initialisation."""
print("siema")
def process(self, input_file, task_options, output_file):
"""Anonymizes input text.
......@@ -28,7 +24,5 @@ class Worker(nlp_ws.NLPWorker):
replaces selected tokens with a random token that
"""
anon = Anonymizer(task_options)
with open(input_file, 'rt', encoding='utf-8') as input_file:
with open(output_file, 'wt', encoding='utf-8') as output_file:
print("elo")
ccl_handler = Ccl_handler(input_file)
ccl_handler.process(output_file, anon.unmarshallers)
Source diff could not be displayed: it is too large. Options to address this: view the blob.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment