Commit af901edf authored by Bartłomiej Koptyra's avatar Bartłomiej Koptyra Committed by Mateusz Gniewkowski

Develop

parent dd9d97f6
image: 'clarinpl/python:3.6'
cache:
paths:
- .tox
stages:
- check_style
- build
before_script:
- pip install tox==2.9.1
pep8:
stage: check_style
script:
- tox -v -e pep8
docstyle:
stage: check_style
script:
- tox -v -e docstyle
build_image:
stage: build
image: 'docker:18.09.7'
only:
- master
services:
- 'docker:18.09.7-dind'
before_script:
- ''
script:
- docker build -t clarinpl/anonymizer .
- echo $DOCKER_PASSWORD > pass.txt
- cat pass.txt | docker login --username $DOCKER_USERNAME --password-stdin
- rm pass.txt
- docker push clarinpl/anonymizer
FROM clarinpl/python:3.6
WORKDIR /home/worker
COPY ./src ./src
COPY ./main.py .
COPY ./requirements.txt .
RUN python3.6 -m pip install -r requirements.txt
CMD ["python3.6", "main.py", "service"]
# anonymizer
# Anonymizer
Liner2 should use model 5nam.
tekst->any2txt->morphodita->liner2->anonymizer
[service]
tool = anonymizer
root = /samba/requests/
rabbit_host = rabbitmq
rabbit_user = test
rabbit_password = test
queue_prefix = nlp_
[tool]
workers_number = 1
[logging]
port = 9998
local_log_level = INFO
[logging_levels]
__main__ = INFO
version: '3'
services:
tokenizer:
container_name: clarin_anonymizer
build: ./
working_dir: /home/worker
entrypoint:
- python3.6
- main.py
- service
environment:
- PYTHONUNBUFFERED=0
volumes:
- '/samba:/samba'
- './config.ini:/home/worker/config.ini'
- './src:/home/worker/src'
- './main.py:/home/worker/main.py'
- './wiki.txt:/home/worker/wiki.txt'
"""Implementation of anonymizer service."""
import argparse
import nlp_ws
from src.worker import Worker
def get_args():
"""Gets command line arguments."""
parser = argparse.ArgumentParser(description="anonymizer")
subparsers = parser.add_subparsers(dest="mode")
subparsers.required = True
subparsers.add_parser(
"service",
help="Run as a service")
return parser.parse_args()
def main():
"""Runs the program."""
args = get_args()
generators = {
"service": lambda: nlp_ws.NLPService.main(Worker),
}
gen_fn = generators.get(args.mode, lambda: None)
gen_fn()
if __name__ == "__main__":
main()
nlp-ws
\ No newline at end of file
"""Implementation of anonymizer functionality."""
import re
from string import punctuation, ascii_lowercase, ascii_uppercase, digits
import random
class Anonymizer:
"""Class used to edit sentences based on options."""
_file_to_liner_dispatch = {
'nam_liv_person': 'person_first_nam',
'nam_liv_person_last': 'person_last_nam',
'nam_fac_road': 'road_nam',
'nam_loc_gpe_city': 'city_nam',
'nam_org_group_team': 'country_nam'
}
_liner_to_tag_dispatch = {
'person_first_nam': '[OSOBA]',
'person_last_nam': '[OSOBA]',
'road_nam': '[MIEJSCE]',
'city_nam': '[MIEJSCE]',
'country_nam': '[MIEJSCE]'
}
def __init__(self, task_options):
"""Initialize anonymizer with task_options."""
self.unmarshallers = {
'chunk': lambda *args: '\n',
'sentence': lambda *args: self._process_sent_tree(*args),
}
self._method = task_options.get('method', 'delete')
self._mail_token = '[MAIL]'
self._user_token = '@[USER]'
self._website_token = '[WWW]'
self._default_token = '[INNE]'
self._form_dict = dict()
self._pseudo_ann_list = list()
self._load_file()
def _load_file(self, file_name='wiki.txt'):
with open(file_name, 'r', encoding='utf-8') as f:
for line in f.readlines():
l_list = line.split()
cat = l_list[0]
if cat in self._file_to_liner_dispatch:
cat_name = self._file_to_liner_dispatch[cat]
length = int((len(l_list) - 2) / 2)
gen_name = ' '.join(l_list[(1 + length):(1 + 2 * length)])
flx_name = ' '.join(l_list[1:(1 + length)])
flex = l_list[-1]
if cat_name not in self._form_dict:
self._form_dict[cat_name] = dict()
if length not in self._form_dict[cat_name]:
self._form_dict[cat_name][length] = dict()
if gen_name not in self._form_dict[cat_name][length]:
self._form_dict[cat_name][length][gen_name] = dict()
self._form_dict[cat_name][length][gen_name][flex] = flx_name
for cat in self._form_dict:
for length in self._form_dict[cat]:
self._form_dict[cat][length] = list(
self._form_dict[cat][length].items()
)
def _process_sent_tree(self, sentence_subtree):
string_builder = []
id = 0
for elem in sentence_subtree:
if elem.tag == 'tok':
tok = self._process_single_tok(id, elem)
string_builder.append(tok)
string_builder.append(' ')
id += 2
elif elem.tag == 'ns':
id -= 1
string_builder.pop()
else:
raise Exception('Unrecognized tag inside sentence: ' + elem.tag)
return self._process_sentence(string_builder)
def _process_sentence(self, string_builder):
string_builder = self._handle_pseudo_ann(string_builder)
return self._anonoymize_phone_number(
self._anonoymize_website(
self._anonoymize_user(
self._anonoymize_email(
''.join(string_builder)
)
)
)
)
def _process_word(self, id, text, tag, ann):
for annotation in ann:
if annotation[1] != 0:
text = self._handle_annotated(id, text, tag, annotation[0])
break
return text
def _handle_annotated(self, id, text, tag, ann):
if self._method == 'delete':
return ''
elif self._method == 'tag':
if ann in self._liner_to_tag_dispatch:
return self._liner_to_tag_dispatch[ann]
elif self._method == 'pseudo':
if ann in self._form_dict:
self._pseudo_ann_list.append((id, text, tag, ann))
return text
def _handle_pseudo_ann(self, string_builder):
if self._pseudo_ann_list:
it = iter(self._pseudo_ann_list)
id, text, tag, ann = next(it)
current_tag = tag
current_ann = ann
current_id = id
length = 1
for id, text, tag, ann in it:
if current_ann == ann and (ann != 'person_first_nam' and
ann != 'person_last_nam'):
if id == current_id + 2:
length += 1
current_tag = tag
current_id = id
continue
new_text = self._get_pseudo_ann(
ann=current_ann,
tag=current_tag,
length=length
)
for t in new_text.split(' '):
string_builder[current_id - 2 * (length - 1)] = t
length -= 1
length = 1
current_tag = tag
current_ann = ann
current_id = id
new_text = self._get_pseudo_ann(current_ann, current_tag, length)
toks = new_text.split(' ')
for i in range(length):
if i < len(toks):
string_builder[current_id - 2 * (length - 1)] = toks[i]
else:
string_builder[current_id - 2 * (length - 1)] = ''
if string_builder[current_id - 2 * (length - 1) + 1] == ' ':
string_builder[current_id - 2 * (length - 1) + 1] = ''
length -= 1
self._pseudo_ann_list.clear()
return string_builder
def _get_pseudo_ann(self, ann, tag, length):
while length not in self._form_dict[ann] and length > 0:
length -= 1
if length == 0:
return ''
new_tag = ':'.join(tag.split(':')[1:4])
for i in range(0, 10):
random_entry = random.choice(self._form_dict[ann][length])
if new_tag in random_entry[1]:
return random_entry[1][new_tag]
if new_tag == 'ign':
return random_entry[0]
random_entry = random.choice(self._form_dict[ann][length])
return random_entry[0]
def _process_single_tok(self, id, tok_subtree):
text = ''
tag = ''
ann = []
for elem in tok_subtree:
if elem.tag == 'orth':
text = elem.text
elif elem.tag == 'lex':
tag = self._process_lex(elem)
elif elem.tag == 'ann':
ann.append(self._process_ann(elem))
word = self._process_word(id, text, tag, ann)
return word
def _process_lex(self, lex_subtree):
tag = ''
for elem in lex_subtree:
if elem.tag == 'ctag':
tag = elem.text
elif elem.tag != 'base':
raise Exception('Unrecognized tag inside lex: ' + elem.tag)
if tag == '':
raise Exception('Lex tag had no ctag inside!')
return tag
def _process_ann(self, ann_subtree):
value = int(ann_subtree.text)
chan = ann_subtree.attrib["chan"]
return chan, value
@staticmethod
def _get_random_character(digit=False, upper=False):
return random.choice(digits) if digit \
else random.choice(ascii_uppercase) \
if upper else random.choice(ascii_lowercase)
@staticmethod
def _generate_pseudo_email(email):
new_mail = []
it = iter(email)
top_domain_len = email.rfind('.')
i = 0
for char in it:
if char == '@':
new_mail.append(char)
i += 1
break
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
if char == '.':
if i == top_domain_len:
new_mail.append(char)
break
new_mail.append(char)
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
new_mail.append(char)
return r''.join(new_mail)
@staticmethod
def _generate_pseudo_user(user):
it = iter(user)
new_user = []
for char in it:
if char in punctuation:
new_user.append(char)
else:
new_user.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_user)
@staticmethod
def _generate_pseudo_website(link):
it = iter(link)
new_link = []
if link[0:4].lower() == 'http':
slashes = 0
for char in it:
if char == '/':
slashes += 1
new_link.append(char)
if slashes == 2:
break
for char in it:
if char == '/':
new_link.append(char)
break
else:
new_link.append(char)
for char in it:
if char in punctuation:
new_link.append(char)
else:
new_link.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_link)
@staticmethod
def _generate_pseudo_phone_number(number):
new_number = []
length = len(number)
it = iter(number)
if number[0] == '+':
how_many = length - 9
for j in range(0, how_many):
new_number.append(next(it))
elif number[0] == '0' and number[1] == '0' \
and number[length - 10] == ' ':
for j in range(0, length - 10):
new_number.append(next(it))
elif number[0] == '(' and number[1] == '0' and number[4] == ')':
for j in range(0, 2):
new_number.append(next(it))
for char in it:
if char.isdigit():
new_number.append(Anonymizer._get_random_character(digit=True))
else:
new_number.append(char)
return r''.join(new_number)
def _generate_phone_number_tag(self, number):
new_number = number.split(' ')
for i in range(len(new_number)):
new_number[i] = self._default_token
return r' '.join(new_number)
def _anonoymize_email(self, sentence):
"""Handles removal/changing of emails addresses."""
email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}'
if self._method == 'delete':
sentence = re.sub(email_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(email_regex, self._mail_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(email_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_email(match),
sentence
)
return sentence
def _anonoymize_user(self, sentence):
"""Handles removal/change of users."""
user_regex = r'\B\@([\w\-]+)'
if self._method == 'delete':
sentence = re.sub(user_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(user_regex, self._user_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(user_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_user(match),
sentence
)
return sentence
def _anonoymize_website(self, sentence):
"""Handles removal/change of links."""
link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \
r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \
r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \
r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \
r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \
r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \
r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \
r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})'
if self._method == 'delete':
sentence = re.sub(link_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(link_regex, self._website_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(link_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_website(match),
sentence
)
return sentence
def _anonoymize_phone_number(self, sentence):
"""Handles removal/change of links."""
phone_number_regex = r'(((\+[1-9]\d{0,2}|00[1-9]\d{0,2}) ?)?(\d{9}))' \
r'|((\+[1-9]\d{0,2} |00[1-9]\d{0,2} )?' \
r'(\d{3} \d{3} \d{3}))|(\(0\d{2}\) \d{2} \d{2} ' \
r'\d{3})|(\(\d{2}\) \d{2} \d{3} \d{2})'
if self._method == 'delete':
sentence = re.sub(phone_number_regex, '', sentence)
elif self._method == 'tag':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_phone_number_tag(match),
sentence
)
elif self._method == 'pseudo':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_phone_number(match),
sentence
)
return sentence
"""Implementation of ccl reading functionality."""
from xml.etree.ElementTree import iterparse
class Ccl_handler:
"""Implements reading ccl for anonymizer service."""
def __init__(self, ccl_file_name):
"""Initialize ccl_handler with a filename."""
self._file_name = ccl_file_name
def process(self, output_file, unmarshallers):
"""Process xml tags using unmarshallers and save in output_file."""
with open(output_file, 'w', encoding='utf-8') as out:
with open(self._file_name, 'r', encoding='utf-8') as f:
for event, elem in iterparse(f):
unmarshal = unmarshallers.get(elem.tag, None)
if unmarshal:
out.write(unmarshal(elem))
elem.clear()
"""Implementation of nlp_worker."""
import logging
import nlp_ws
from src.anonymizer import Anonymizer
from src.ccl_handler import Ccl_handler
_log = logging.getLogger(__name__)
class Worker(nlp_ws.NLPWorker):
"""Implements nlp_worker for anonymizer service."""
def process(self, input_file, task_options, output_file):
"""Anonymizes input text.
It is assumed input_file is encoded in UTF-8.
Options:
method - 'delete'/'tag'/'pseudo' - 'delete' deletes selected tokens,
'tag' replaces selected tokens with arbitrary tags, 'pseudo'
replaces selected tokens with a random token that
"""
anon = Anonymizer(task_options)
ccl_handler = Ccl_handler(input_file)
ccl_handler.process(output_file, anon.unmarshallers)
[tox]
envlist = pep8,docstyle
skipsdist = True
[testenv:pep8]
deps =
flake8
basepython = python3
commands =
flake8 {posargs}
[testenv:docstyle]
deps =
pydocstyle
basepython = python3
commands =
pydocstyle --verbose {posargs}
[flake8]
# W504 skipped because it is overeager and unnecessary