Commit 50ee0804 authored by Norbert Ropiak's avatar Norbert Ropiak Committed by Mateusz Gniewkowski

Code refactor + date/url/user/phone/email anonymization

parent 1f8b135a
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
.vscode
\ No newline at end of file
# Anonymizer
Service that automatically anonymizes text for polish language.
Anonymizer works in 3 modes, when sensitive data is detected, it can perform operations:
- delete - sensitive data is deleted
- tag - sensitive data is replaced by the category tag it belongs to
- pseudo (pseudonymization) - sensitive data is replaced by another object in the same category
### Examples:
- Delete
- Spotkałem się dzisiaj z Janem Kowalskim.
- Spotkałem się dzisiaj z .
- Tag
- Spotkałem się dzisiaj z Janem Kowalskim.
- Spotkałem się dzisiaj z [OSOBA] [OSOBA].
- Pseudonymization
- Spotkałem się dzisiaj z Janem Kowalskim.
- Spotkałem się dzisiaj z Stefanem Michlem.
Liner2 should use model 5nam.
tekst->any2txt->morphodita->liner2->anonymizer
nlp-ws
\ No newline at end of file
nlp-ws
regex==2020.10.28
Babel==2.8.0
\ No newline at end of file
"""Implementation of anonymizer functionality."""
import re
from string import punctuation, ascii_lowercase, ascii_uppercase, digits
import random
from src.generators import (generate_pseudo_email, generate_pseudo_phone_number,
generate_pseudo_user, generate_pseudo_website,
generate_phone_number_tag, generate_pseudo_date)
import regex
class Anonymizer:
"""Class used to edit sentences based on options."""
email_regex = regex.compile(
r'(?P<local_part>[a-z0-9!#$%&\'*+/=?^_`{|}~-]+'
r'(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*@)'
r'(?P<domain>(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+)'
r'(?P<tld>[a-z0-9](?:[a-z0-9-]*[a-z0-9])?)', regex.I
)
user_regex = regex.compile(r'\B(?P<username>\@[\w\-]+)')
_website_exceptions = ['m.in']
website_regex = regex.compile(
r'\b(?:{})\b(*SKIP)(*FAIL)|'.format('|'.join(_website_exceptions)) +
r'(?:(?P<protocol>(?:(?:https?|ftp):)?\/\/)?'
r'(?P<auth>\S+(?::\S*)?@)?'
r'(?P<host>(?!(?:10|127)(?:\.\d{1,3}){3})'
r'(?!(?:169\.254|192\.168)(?:\.\d{1,3}){2})'
r'(?!172\.(?:1[6-9]|2\d|3[0-1])(?:\.\d{1,3}){2})'
r'(?:[1-9]\d?|1\d\d|2[01]\d|22[0-3])'
r'(?:\.(?:1?\d{1,2}|2[0-4]\d|25[0-5])){2}'
r'(?:\.(?:[1-9]\d?|1\d\d|2[0-4]\d|25[0-4]))'
r'|'
r'((?:[a-z0-9\u00a1-\uffff][a-z0-9\u00a1-\uffff_-]{0,62})?'
r'[a-z0-9\u00a1-\uffff]\.)+)'
r'(?P<tld>[a-z\u00a1-\uffff]{2,}\.??)'
r'(?P<port>:\d{2,5})?'
r'(?P<path>[/?#]\S*)?)',
regex.UNICODE | regex.I
)
phone_number_regex = regex.compile(
r'(?P<country_code>(00[1-9]\d?)|(\(?([+\d]{2,3})\)?)[- ]??)?'
r'(?P<number>(\d[- ]??){9,10})'
)
date_regex = regex.compile(
r'\b(?P<day_or_month_year>'
r'(?P<day_month1>[0-3]?\d)(?P<punct1>[ \t\-\./,]{1,2})'
r'(?P<day_month2>[0-3]?\d)(?P<punct2>[ \t\-\./,]{1,2})'
r'(?P<year1>\d{4}|\d{2}))\b|'
r'\b(?P<year_month_or_day>(?P<year2>\d{4}|\d{2})'
r'(?P<punct3>[ \t\-\./,]{1,2})(?P<day_month3>[0-3]?\d)'
r'(?P<punct4>[ \t\-\./,]{1,2})(?P<day_month4>[0-3]?\d))\b|'
r'(?P<month_in_words>'
r'(?:(?P<day1>[0-3]?\d)(?P<punct5>[ \t\-\./,]{0,2}))?'
r'\b(?P<month>Sty(?:|cze[nń]|cznia)|Lut(?:|y|ego)|Mar(?:|zec|ca)|'
r'Kwi(?:|ecie[nń]|etnia)|Maj(?:|a)|Cze(?:|rwiec|rwca)|Lip(?:|iec|ca)'
r'|Sie(?:|rpie[nń]|rpnia)|Wrz(?:|esie[nń]|e[śs]nia)'
r'|Pa[zź](?:|dziernik|dziernika)|Lis(?:|topad|stopada)'
r'|Gru(?:|dzie[nń]|dnia))\b'
r'((?:(?P<punct7>[ \t\-\./,]{0,2})(?P<day2>[0-3]?\d))'
r'(?:(?P<punct8>[ \t\-\./,]{1,2})(?P<year4>\d{4}|\d{2}))|'
r'(?:(?P<punct6>[ \t\-\./,]{0,2})(?P<year3>\d{4}|\d{2})))?)', regex.I
)
_file_to_liner_dispatch = {
'nam_liv_person': 'person_first_nam',
'nam_liv_person_last': 'person_last_nam',
......@@ -33,9 +88,24 @@ class Anonymizer:
self._mail_token = '[MAIL]'
self._user_token = '@[USER]'
self._website_token = '[WWW]'
self._digits_token = '[DIGITS]'
self._date_token = '[DATE]'
self._default_token = '[INNE]'
self._form_dict = dict()
self._pseudo_ann_list = list()
# Order is important, first more specific
self._category_anonymisation = {
'user': (self.user_regex, self._user_token,
generate_pseudo_user),
'email': (self.email_regex, self._mail_token,
generate_pseudo_email),
'website': (self.website_regex, self._website_token,
generate_pseudo_website),
'date': (self.date_regex, self._date_token,
generate_pseudo_date),
'phone_number': (self.phone_number_regex, self._digits_token,
generate_pseudo_phone_number),
}
self._load_file()
def _load_file(self, file_name='wiki.txt'):
......@@ -80,20 +150,13 @@ class Anonymizer:
def _process_sentence(self, string_builder):
string_builder = self._handle_pseudo_ann(string_builder)
return self._anonoymize_phone_number(
self._anonoymize_website(
self._anonoymize_user(
self._anonoymize_email(
''.join(string_builder)
)
)
)
)
sentence = ''.join(string_builder)
return self._anonymize(sentence)
def _process_word(self, id, text, tag, ann):
for annotation in ann:
if annotation[1] != 0:
text = self._handle_annotated(id, text, tag, annotation[0])
for chan, value in ann:
if value != 0:
text = self._handle_annotated(id, text, tag, chan)
break
return text
......@@ -194,220 +257,49 @@ class Anonymizer:
chan = ann_subtree.attrib["chan"]
return chan, value
@staticmethod
def _get_random_character(digit=False, upper=False):
return random.choice(digits) if digit \
else random.choice(ascii_uppercase) \
if upper else random.choice(ascii_lowercase)
@staticmethod
def _generate_pseudo_email(email):
new_mail = []
it = iter(email)
top_domain_len = email.rfind('.')
i = 0
for char in it:
if char == '@':
new_mail.append(char)
i += 1
break
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
if char == '.':
if i == top_domain_len:
new_mail.append(char)
break
new_mail.append(char)
elif char in punctuation:
new_mail.append(char)
else:
new_mail.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
i += 1
for char in it:
new_mail.append(char)
return r''.join(new_mail)
@staticmethod
def _generate_pseudo_user(user):
it = iter(user)
new_user = []
for char in it:
if char in punctuation:
new_user.append(char)
else:
new_user.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_user)
@staticmethod
def _generate_pseudo_website(link):
it = iter(link)
new_link = []
if link[0:4].lower() == 'http':
slashes = 0
for char in it:
if char == '/':
slashes += 1
new_link.append(char)
if slashes == 2:
break
for char in it:
if char == '/':
new_link.append(char)
break
else:
new_link.append(char)
for char in it:
if char in punctuation:
new_link.append(char)
else:
new_link.append(
Anonymizer._get_random_character(
char.isdigit(),
char.isupper()
)
)
return r''.join(new_link)
@staticmethod
def _generate_pseudo_phone_number(number):
new_number = []
length = len(number)
it = iter(number)
if number[0] == '+':
how_many = length - 9
for j in range(0, how_many):
new_number.append(next(it))
elif number[0] == '0' and number[1] == '0' \
and number[length - 10] == ' ':
for j in range(0, length - 10):
new_number.append(next(it))
elif number[0] == '(' and number[1] == '0' and number[4] == ')':
for j in range(0, 2):
new_number.append(next(it))
for char in it:
if char.isdigit():
new_number.append(Anonymizer._get_random_character(digit=True))
else:
new_number.append(char)
return r''.join(new_number)
def _generate_phone_number_tag(self, number):
new_number = number.split(' ')
for i in range(len(new_number)):
new_number[i] = self._default_token
return r' '.join(new_number)
def _anonoymize_email(self, sentence):
"""Handles removal/changing of emails addresses."""
email_regex = r'[\w\.-]+@[\w\.-]+\.\w{2,4}'
def _anonymize(self, sentence):
if self._method == 'delete':
sentence = re.sub(email_regex, '', sentence)
for pattern, _, _ in self._category_anonymisation.values():
sentence = regex.sub(pattern, '', sentence)
elif self._method == 'tag':
sentence = re.sub(email_regex, self._mail_token, sentence)
sentence = self._tagging(sentence)
elif self._method == 'pseudo':
matches = re.findall(email_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_email(match),
sentence
)
sentence = self._pseudonymization(sentence)
return sentence
def _anonoymize_user(self, sentence):
"""Handles removal/change of users."""
user_regex = r'\B\@([\w\-]+)'
if self._method == 'delete':
sentence = re.sub(user_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(user_regex, self._user_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(user_regex, sentence)
for match in matches:
sentence = re.sub(
re.escape(match),
self._generate_pseudo_user(match),
sentence
)
return sentence
def _tagging(self, sentence):
for category in self._category_anonymisation:
pattern, token, _ = self._category_anonymisation[category]
def _anonoymize_website(self, sentence):
"""Handles removal/change of links."""
link_regex = r'(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?' \
r'[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)' \
r'(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]' \
r'[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)' \
r'(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/' \
r'(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)' \
r'(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.' \
r'(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})'
if self._method == 'delete':
sentence = re.sub(link_regex, '', sentence)
elif self._method == 'tag':
sentence = re.sub(link_regex, self._website_token, sentence)
elif self._method == 'pseudo':
matches = re.findall(link_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_website(match),
sentence
)
if category == 'phone_number':
matches = [m for m in pattern.finditer(sentence)]
for match in matches:
tag = generate_phone_number_tag(match.groupdict(''), token)
replace_match = match.group(0)
sentence = regex.sub(regex.escape(
replace_match), tag, sentence)
else:
sentence = regex.sub(pattern, token, sentence)
return sentence
def _anonoymize_phone_number(self, sentence):
"""Handles removal/change of links."""
phone_number_regex = r'(((\+[1-9]\d{0,2}|00[1-9]\d{0,2}) ?)?(\d{9}))' \
r'|((\+[1-9]\d{0,2} |00[1-9]\d{0,2} )?' \
r'(\d{3} \d{3} \d{3}))|(\(0\d{2}\) \d{2} \d{2} ' \
r'\d{3})|(\(\d{2}\) \d{2} \d{3} \d{2})'
if self._method == 'delete':
sentence = re.sub(phone_number_regex, '', sentence)
elif self._method == 'tag':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_phone_number_tag(match),
sentence
)
elif self._method == 'pseudo':
matches = re.findall(phone_number_regex, sentence)
for match in matches:
for val in match:
if val != '':
match = val
break
sentence = re.sub(
re.escape(match),
self._generate_pseudo_phone_number(match),
sentence
)
def _pseudonymization(self, sentence):
sentence_after_regex = sentence
to_replace = []
for category in self._category_anonymisation:
pattern, _, generator = self._category_anonymisation[category]
for match in pattern.finditer(sentence_after_regex):
if not match:
continue
to_replace.append((match, generator))
sentence_after_regex = regex.sub(
regex.escape(match.group(0)), '', sentence_after_regex)
for match, generator in to_replace:
replace_match = match.group(0)
pseudo_string = generator(match.groupdict(''))
sentence = regex.sub(
regex.escape(replace_match),
pseudo_string,
sentence
)
return sentence
......@@ -2,19 +2,19 @@
from xml.etree.ElementTree import iterparse
class Ccl_handler:
class CCLHandler:
"""Implements reading ccl for anonymizer service."""
def __init__(self, ccl_file_name):
"""Initialize ccl_handler with a filename."""
"""Initialize CCLHandler with a filename."""
self._file_name = ccl_file_name
def process(self, output_file, unmarshallers):
def process(self, output_filename, unmarshallers):
"""Process xml tags using unmarshallers and save in output_file."""
with open(output_file, 'w', encoding='utf-8') as out:
with open(self._file_name, 'r', encoding='utf-8') as f:
for event, elem in iterparse(f):
unmarshal = unmarshallers.get(elem.tag, None)
if unmarshal:
out.write(unmarshal(elem))
elem.clear()
with open(self._file_name, 'r', encoding='utf-8') as input_file, \
open(output_filename, 'w', encoding='utf-8') as output_file:
for event, elem in iterparse(input_file):
unmarshal = unmarshallers.get(elem.tag, None)
if unmarshal:
output_file.write(unmarshal(elem))
elem.clear()
"""Implementation of pseudonimization for different token categories."""
import re
import random
import calendar
from string import punctuation, ascii_lowercase, ascii_uppercase, digits
from datetime import datetime
from babel import Locale
def get_random_character(digit: bool = False, upper: bool = False):
"""Generate random character.
Args:
digit (bool): Return random single digit.
upper (bool): Return uppercase character.
"""
return random.choice(digits) if digit \
else random.choice(ascii_uppercase) \
if upper else random.choice(ascii_lowercase)
def pseudonymize_string(sentence: str, leave_chars: str = ''):
"""Change characters in string.
Uppercase character for uppercase, lowercase for lowercase, digit for digit.
Args:
sentence (str): Sentence to pseudonimize.
leave_chars (str): Characters that should remain unchanged e.g ' -()'.
"""
if not sentence:
return ''
pseudonymized = ''
for char in sentence:
if char in leave_chars:
pseudonymized += char
else:
pseudonymized += get_random_character(
char.isdigit(),
char.isupper())
return pseudonymized
def generate_pseudo_email(email_match: str):
"""Generate pseudonimized email based on matched email in text.