Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
A
anonymizer
Manage
Activity
Members
Labels
Plan
Issues
0
Issue boards
Milestones
Wiki
Redmine
Code
Merge requests
0
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
nlpworkers
anonymizer
Merge requests
!1
Develop
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Develop
develop
into
master
Overview
0
Commits
8
Pipelines
1
Changes
13
Merged
Ghost User
requested to merge
develop
into
master
4 years ago
Overview
0
Commits
8
Pipelines
1
Changes
13
Expand
0
0
Merge request reports
Compare
master
master (base)
and
latest version
latest version
3e11ed90
8 commits,
4 years ago
13 files
+
8928
−
2
Expand all files
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
13
Search (e.g. *.vue) (Ctrl+P)
src/anonymizer.py
0 → 100644
+
413
−
0
Options
"""
Implementation of anonymizer functionality.
"""
import
re
from
string
import
punctuation
,
ascii_lowercase
,
ascii_uppercase
,
digits
import
random
class
Anonymizer
:
"""
Class used to edit sentences based on options.
"""
_file_to_liner_dispatch
=
{
'
nam_liv_person
'
:
'
person_first_nam
'
,
'
nam_liv_person_last
'
:
'
person_last_nam
'
,
'
nam_fac_road
'
:
'
road_nam
'
,
'
nam_loc_gpe_city
'
:
'
city_nam
'
,
'
nam_org_group_team
'
:
'
country_nam
'
}
_liner_to_tag_dispatch
=
{
'
person_first_nam
'
:
'
[OSOBA]
'
,
'
person_last_nam
'
:
'
[OSOBA]
'
,
'
road_nam
'
:
'
[MIEJSCE]
'
,
'
city_nam
'
:
'
[MIEJSCE]
'
,
'
country_nam
'
:
'
[MIEJSCE]
'
}
def
__init__
(
self
,
task_options
):
"""
Initialize anonymizer with task_options.
"""
self
.
unmarshallers
=
{
'
chunk
'
:
lambda
*
args
:
'
\n
'
,
'
sentence
'
:
lambda
*
args
:
self
.
_process_sent_tree
(
*
args
),
}
self
.
_method
=
task_options
.
get
(
'
method
'
,
'
delete
'
)
self
.
_mail_token
=
'
[MAIL]
'
self
.
_user_token
=
'
@[USER]
'
self
.
_website_token
=
'
[WWW]
'
self
.
_default_token
=
'
[INNE]
'
self
.
_form_dict
=
dict
()
self
.
_pseudo_ann_list
=
list
()
self
.
_load_file
()
def
_load_file
(
self
,
file_name
=
'
wiki.txt
'
):
with
open
(
file_name
,
'
r
'
,
encoding
=
'
utf-8
'
)
as
f
:
for
line
in
f
.
readlines
():
l_list
=
line
.
split
()
cat
=
l_list
[
0
]
if
cat
in
self
.
_file_to_liner_dispatch
:
cat_name
=
self
.
_file_to_liner_dispatch
[
cat
]
length
=
int
((
len
(
l_list
)
-
2
)
/
2
)
gen_name
=
'
'
.
join
(
l_list
[(
1
+
length
):(
1
+
2
*
length
)])
flx_name
=
'
'
.
join
(
l_list
[
1
:(
1
+
length
)])
flex
=
l_list
[
-
1
]
if
cat_name
not
in
self
.
_form_dict
:
self
.
_form_dict
[
cat_name
]
=
dict
()
if
length
not
in
self
.
_form_dict
[
cat_name
]:
self
.
_form_dict
[
cat_name
][
length
]
=
dict
()
if
gen_name
not
in
self
.
_form_dict
[
cat_name
][
length
]:
self
.
_form_dict
[
cat_name
][
length
][
gen_name
]
=
dict
()
self
.
_form_dict
[
cat_name
][
length
][
gen_name
][
flex
]
=
flx_name
for
cat
in
self
.
_form_dict
:
for
length
in
self
.
_form_dict
[
cat
]:
self
.
_form_dict
[
cat
][
length
]
=
list
(
self
.
_form_dict
[
cat
][
length
].
items
()
)
def
_process_sent_tree
(
self
,
sentence_subtree
):
string_builder
=
[]
id
=
0
for
elem
in
sentence_subtree
:
if
elem
.
tag
==
'
tok
'
:
tok
=
self
.
_process_single_tok
(
id
,
elem
)
string_builder
.
append
(
tok
)
string_builder
.
append
(
'
'
)
id
+=
2
elif
elem
.
tag
==
'
ns
'
:
id
-=
1
string_builder
.
pop
()
else
:
raise
Exception
(
'
Unrecognized tag inside sentence:
'
+
elem
.
tag
)
return
self
.
_process_sentence
(
string_builder
)
def
_process_sentence
(
self
,
string_builder
):
string_builder
=
self
.
_handle_pseudo_ann
(
string_builder
)
return
self
.
_anonoymize_phone_number
(
self
.
_anonoymize_website
(
self
.
_anonoymize_user
(
self
.
_anonoymize_email
(
''
.
join
(
string_builder
)
)
)
)
)
def
_process_word
(
self
,
id
,
text
,
tag
,
ann
):
for
annotation
in
ann
:
if
annotation
[
1
]
!=
0
:
text
=
self
.
_handle_annotated
(
id
,
text
,
tag
,
annotation
[
0
])
break
return
text
def
_handle_annotated
(
self
,
id
,
text
,
tag
,
ann
):
if
self
.
_method
==
'
delete
'
:
return
''
elif
self
.
_method
==
'
tag
'
:
if
ann
in
self
.
_liner_to_tag_dispatch
:
return
self
.
_liner_to_tag_dispatch
[
ann
]
elif
self
.
_method
==
'
pseudo
'
:
if
ann
in
self
.
_form_dict
:
self
.
_pseudo_ann_list
.
append
((
id
,
text
,
tag
,
ann
))
return
text
def
_handle_pseudo_ann
(
self
,
string_builder
):
if
self
.
_pseudo_ann_list
:
it
=
iter
(
self
.
_pseudo_ann_list
)
id
,
text
,
tag
,
ann
=
next
(
it
)
current_tag
=
tag
current_ann
=
ann
current_id
=
id
length
=
1
for
id
,
text
,
tag
,
ann
in
it
:
if
current_ann
==
ann
and
(
ann
!=
'
person_first_nam
'
and
ann
!=
'
person_last_nam
'
):
if
id
==
current_id
+
2
:
length
+=
1
current_tag
=
tag
current_id
=
id
continue
new_text
=
self
.
_get_pseudo_ann
(
ann
=
current_ann
,
tag
=
current_tag
,
length
=
length
)
for
t
in
new_text
.
split
(
'
'
):
string_builder
[
current_id
-
2
*
(
length
-
1
)]
=
t
length
-=
1
length
=
1
current_tag
=
tag
current_ann
=
ann
current_id
=
id
new_text
=
self
.
_get_pseudo_ann
(
current_ann
,
current_tag
,
length
)
toks
=
new_text
.
split
(
'
'
)
for
i
in
range
(
length
):
if
i
<
len
(
toks
):
string_builder
[
current_id
-
2
*
(
length
-
1
)]
=
toks
[
i
]
else
:
string_builder
[
current_id
-
2
*
(
length
-
1
)]
=
''
if
string_builder
[
current_id
-
2
*
(
length
-
1
)
+
1
]
==
'
'
:
string_builder
[
current_id
-
2
*
(
length
-
1
)
+
1
]
=
''
length
-=
1
self
.
_pseudo_ann_list
.
clear
()
return
string_builder
def
_get_pseudo_ann
(
self
,
ann
,
tag
,
length
):
while
length
not
in
self
.
_form_dict
[
ann
]
and
length
>
0
:
length
-=
1
if
length
==
0
:
return
''
new_tag
=
'
:
'
.
join
(
tag
.
split
(
'
:
'
)[
1
:
4
])
for
i
in
range
(
0
,
10
):
random_entry
=
random
.
choice
(
self
.
_form_dict
[
ann
][
length
])
if
new_tag
in
random_entry
[
1
]:
return
random_entry
[
1
][
new_tag
]
if
new_tag
==
'
ign
'
:
return
random_entry
[
0
]
random_entry
=
random
.
choice
(
self
.
_form_dict
[
ann
][
length
])
return
random_entry
[
0
]
def
_process_single_tok
(
self
,
id
,
tok_subtree
):
text
=
''
tag
=
''
ann
=
[]
for
elem
in
tok_subtree
:
if
elem
.
tag
==
'
orth
'
:
text
=
elem
.
text
elif
elem
.
tag
==
'
lex
'
:
tag
=
self
.
_process_lex
(
elem
)
elif
elem
.
tag
==
'
ann
'
:
ann
.
append
(
self
.
_process_ann
(
elem
))
word
=
self
.
_process_word
(
id
,
text
,
tag
,
ann
)
return
word
def
_process_lex
(
self
,
lex_subtree
):
tag
=
''
for
elem
in
lex_subtree
:
if
elem
.
tag
==
'
ctag
'
:
tag
=
elem
.
text
elif
elem
.
tag
!=
'
base
'
:
raise
Exception
(
'
Unrecognized tag inside lex:
'
+
elem
.
tag
)
if
tag
==
''
:
raise
Exception
(
'
Lex tag had no ctag inside!
'
)
return
tag
def
_process_ann
(
self
,
ann_subtree
):
value
=
int
(
ann_subtree
.
text
)
chan
=
ann_subtree
.
attrib
[
"
chan
"
]
return
chan
,
value
@staticmethod
def
_get_random_character
(
digit
=
False
,
upper
=
False
):
return
random
.
choice
(
digits
)
if
digit
\
else
random
.
choice
(
ascii_uppercase
)
\
if
upper
else
random
.
choice
(
ascii_lowercase
)
@staticmethod
def
_generate_pseudo_email
(
email
):
new_mail
=
[]
it
=
iter
(
email
)
top_domain_len
=
email
.
rfind
(
'
.
'
)
i
=
0
for
char
in
it
:
if
char
==
'
@
'
:
new_mail
.
append
(
char
)
i
+=
1
break
elif
char
in
punctuation
:
new_mail
.
append
(
char
)
else
:
new_mail
.
append
(
Anonymizer
.
_get_random_character
(
char
.
isdigit
(),
char
.
isupper
()
)
)
i
+=
1
for
char
in
it
:
if
char
==
'
.
'
:
if
i
==
top_domain_len
:
new_mail
.
append
(
char
)
break
new_mail
.
append
(
char
)
elif
char
in
punctuation
:
new_mail
.
append
(
char
)
else
:
new_mail
.
append
(
Anonymizer
.
_get_random_character
(
char
.
isdigit
(),
char
.
isupper
()
)
)
i
+=
1
for
char
in
it
:
new_mail
.
append
(
char
)
return
r
''
.
join
(
new_mail
)
@staticmethod
def
_generate_pseudo_user
(
user
):
it
=
iter
(
user
)
new_user
=
[]
for
char
in
it
:
if
char
in
punctuation
:
new_user
.
append
(
char
)
else
:
new_user
.
append
(
Anonymizer
.
_get_random_character
(
char
.
isdigit
(),
char
.
isupper
()
)
)
return
r
''
.
join
(
new_user
)
@staticmethod
def
_generate_pseudo_website
(
link
):
it
=
iter
(
link
)
new_link
=
[]
if
link
[
0
:
4
].
lower
()
==
'
http
'
:
slashes
=
0
for
char
in
it
:
if
char
==
'
/
'
:
slashes
+=
1
new_link
.
append
(
char
)
if
slashes
==
2
:
break
for
char
in
it
:
if
char
==
'
/
'
:
new_link
.
append
(
char
)
break
else
:
new_link
.
append
(
char
)
for
char
in
it
:
if
char
in
punctuation
:
new_link
.
append
(
char
)
else
:
new_link
.
append
(
Anonymizer
.
_get_random_character
(
char
.
isdigit
(),
char
.
isupper
()
)
)
return
r
''
.
join
(
new_link
)
@staticmethod
def
_generate_pseudo_phone_number
(
number
):
new_number
=
[]
length
=
len
(
number
)
it
=
iter
(
number
)
if
number
[
0
]
==
'
+
'
:
how_many
=
length
-
9
for
j
in
range
(
0
,
how_many
):
new_number
.
append
(
next
(
it
))
elif
number
[
0
]
==
'
0
'
and
number
[
1
]
==
'
0
'
\
and
number
[
length
-
10
]
==
'
'
:
for
j
in
range
(
0
,
length
-
10
):
new_number
.
append
(
next
(
it
))
elif
number
[
0
]
==
'
(
'
and
number
[
1
]
==
'
0
'
and
number
[
4
]
==
'
)
'
:
for
j
in
range
(
0
,
2
):
new_number
.
append
(
next
(
it
))
for
char
in
it
:
if
char
.
isdigit
():
new_number
.
append
(
Anonymizer
.
_get_random_character
(
digit
=
True
))
else
:
new_number
.
append
(
char
)
return
r
''
.
join
(
new_number
)
def
_generate_phone_number_tag
(
self
,
number
):
new_number
=
number
.
split
(
'
'
)
for
i
in
range
(
len
(
new_number
)):
new_number
[
i
]
=
self
.
_default_token
return
r
'
'
.
join
(
new_number
)
def
_anonoymize_email
(
self
,
sentence
):
"""
Handles removal/changing of emails addresses.
"""
email_regex
=
r
'
[\w\.-]+@[\w\.-]+\.\w{2,4}
'
if
self
.
_method
==
'
delete
'
:
sentence
=
re
.
sub
(
email_regex
,
''
,
sentence
)
elif
self
.
_method
==
'
tag
'
:
sentence
=
re
.
sub
(
email_regex
,
self
.
_mail_token
,
sentence
)
elif
self
.
_method
==
'
pseudo
'
:
matches
=
re
.
findall
(
email_regex
,
sentence
)
for
match
in
matches
:
sentence
=
re
.
sub
(
re
.
escape
(
match
),
self
.
_generate_pseudo_email
(
match
),
sentence
)
return
sentence
def
_anonoymize_user
(
self
,
sentence
):
"""
Handles removal/change of users.
"""
user_regex
=
r
'
\B\@([\w\-]+)
'
if
self
.
_method
==
'
delete
'
:
sentence
=
re
.
sub
(
user_regex
,
''
,
sentence
)
elif
self
.
_method
==
'
tag
'
:
sentence
=
re
.
sub
(
user_regex
,
self
.
_user_token
,
sentence
)
elif
self
.
_method
==
'
pseudo
'
:
matches
=
re
.
findall
(
user_regex
,
sentence
)
for
match
in
matches
:
sentence
=
re
.
sub
(
re
.
escape
(
match
),
self
.
_generate_pseudo_user
(
match
),
sentence
)
return
sentence
def
_anonoymize_website
(
self
,
sentence
):
"""
Handles removal/change of links.
"""
link_regex
=
r
'
(((h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/(?:www\.|(?!www)))?
'
\
r
'
[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]+\.(?:(?!(h|H)
'
\
r
'
(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]
'
\
r
'
[a-zA-Z0-9-]+[a-zA-Z0-9]\.(?:(?!(h|H)(t|T)(t|T)(p|P)
'
\
r
'
(s|S)?:\/\/))[^\s]{2,}|(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/
'
\
r
'
(?:www\.|(?!www))[a-zA-Z0-9]+\.(?:(?!(h|H)(t|T)(t|T)
'
\
r
'
(p|P)(s|S)?:\/\/))[^\s]{2,}|www\.[a-zA-Z0-9]+\.
'
\
r
'
(?:(?!(h|H)(t|T)(t|T)(p|P)(s|S)?:\/\/))[^\s]{2,})
'
if
self
.
_method
==
'
delete
'
:
sentence
=
re
.
sub
(
link_regex
,
''
,
sentence
)
elif
self
.
_method
==
'
tag
'
:
sentence
=
re
.
sub
(
link_regex
,
self
.
_website_token
,
sentence
)
elif
self
.
_method
==
'
pseudo
'
:
matches
=
re
.
findall
(
link_regex
,
sentence
)
for
match
in
matches
:
for
val
in
match
:
if
val
!=
''
:
match
=
val
break
sentence
=
re
.
sub
(
re
.
escape
(
match
),
self
.
_generate_pseudo_website
(
match
),
sentence
)
return
sentence
def
_anonoymize_phone_number
(
self
,
sentence
):
"""
Handles removal/change of links.
"""
phone_number_regex
=
r
'
(((\+[1-9]\d{0,2}|00[1-9]\d{0,2}) ?)?(\d{9}))
'
\
r
'
|((\+[1-9]\d{0,2} |00[1-9]\d{0,2} )?
'
\
r
'
(\d{3} \d{3} \d{3}))|(\(0\d{2}\) \d{2} \d{2}
'
\
r
'
\d{3})|(\(\d{2}\) \d{2} \d{3} \d{2})
'
if
self
.
_method
==
'
delete
'
:
sentence
=
re
.
sub
(
phone_number_regex
,
''
,
sentence
)
elif
self
.
_method
==
'
tag
'
:
matches
=
re
.
findall
(
phone_number_regex
,
sentence
)
for
match
in
matches
:
for
val
in
match
:
if
val
!=
''
:
match
=
val
break
sentence
=
re
.
sub
(
re
.
escape
(
match
),
self
.
_generate_phone_number_tag
(
match
),
sentence
)
elif
self
.
_method
==
'
pseudo
'
:
matches
=
re
.
findall
(
phone_number_regex
,
sentence
)
for
match
in
matches
:
for
val
in
match
:
if
val
!=
''
:
match
=
val
break
sentence
=
re
.
sub
(
re
.
escape
(
match
),
self
.
_generate_pseudo_phone_number
(
match
),
sentence
)
return
sentence