Skip to content
Snippets Groups Projects
Commit e25eadab authored by Michał Pogoda's avatar Michał Pogoda
Browse files

Implement support for wiktorner

parent 63784ace
No related branches found
No related tags found
2 merge requests!10Anonimizer v2,!7Better coverage
Pipeline #7844 failed
Showing
with 405 additions and 211 deletions
from typing import Optional
from src.detections import Detection, MorphosyntacticInfoMixin
import random
from src.dictionaries.morphosyntactic.ner_file import NERFileMorphosyntacticDictionary
class NERFileNKJPMorphosyntacticDictionary(NERFileMorphosyntacticDictionary):
def __init__(self, dictionary_path: Optional[str] = None, always_replace=True) -> None:
super().__init__(dictionary_path, always_replace)
def get_random_replacement(self, original_entry: Detection) -> Optional[str]:
original_entry_type = type(original_entry)
original_entry_type_name = original_entry_type.TYPE_NAME
result = None
if issubclass(original_entry_type, MorphosyntacticInfoMixin):
# THAT IS A HACK FOR NOW FOR CORRUPTED NKJP TAGS IN DICTIONARY
morpho_tag = ":".join(original_entry.morpho_tag.split(":")[1:])
if (
original_entry_type_name in self._dictionary
and morpho_tag in self._dictionary[original_entry_type_name]
):
result = random.choice(
list(
self._dictionary[original_entry_type_name][morpho_tag].values()
)
)
if result is None and self._always_replace:
random_type = random.choice(list(self._dictionary.keys()))
random_tag = random.choice(list(self._dictionary[random_type].keys()))
result = random.choice(
list(self._dictionary[random_type][random_tag].values())
)
return result
from typing import Dict, List, Optional, Tuple, Type
from collections import defaultdict
from src.detections import Detection, OtherDetection, MorphosyntacticInfoMixin
from src.dictionaries.morphosyntactic.interface import MorphosyntacticDictionary
import random
from src.detections import (
NameDetection,
SurnameDetection,
StreetNameDetection,
CityDetection,
CountryDetection,
)
NER_PL_N5_MAPPING = {
"nam_liv_person": NameDetection,
"nam_liv_person_last": SurnameDetection,
"nam_fac_road": StreetNameDetection,
"nam_loc_gpe_city": CityDetection,
"nam_org_group_team": CountryDetection,
}
class PlNERMorphosyntacticDictionary(MorphosyntacticDictionary):
def __init__(
self,
dictionary_path: Optional[str] = None,
list: Optional[List[Tuple[Detection, str, str, str]]] = None,
always_replace=True,
) -> None:
super().__init__()
self._dictionary = None
self._always_replace = always_replace
if dictionary_path is not None:
self._from_file(dictionary_path, NER_PL_N5_MAPPING)
elif list is not None:
self._from_list(list)
else:
raise ValueError("Either dictionary_path or list must be provided.")
def _from_file(
self, path_to_dictionary: str, annotation_mapping: Dict[str, Type[Detection]]
) -> None:
self._dictionary = load_pl_ner_replacements_dictionary(
path_to_dictionary, annotation_mapping
)
def _from_list(self, list: List[Tuple[Detection, str, str, str]]) -> None:
self._dictionary = defaultdict(lambda: defaultdict(dict))
for annotation, word, lemma, morpho_tag in list:
self._dictionary[annotation][morpho_tag][lemma] = word
def get_supported_detection_classes(self) -> List[Type[Detection]]:
"""
Returns a list of supported detection classes
"""
return list(self._dictionary.keys())
def get_random_replacement(self, original_entry: Detection) -> Optional[str]:
original_entry_type = type(original_entry)
result = None
if issubclass(original_entry_type, MorphosyntacticInfoMixin):
morpho_tag = ":".join(original_entry.morpho_tag.split(":")[1:])
if (
original_entry_type in self._dictionary
and morpho_tag in self._dictionary[original_entry_type]
):
result = random.choice(
list(self._dictionary[original_entry_type][morpho_tag].values())
)
if result is None and self._always_replace:
random_type = random.choice(list(self._dictionary.keys()))
random_tag = random.choice(list(self._dictionary[random_type].keys()))
result = random.choice(
list(self._dictionary[random_type][random_tag].values())
)
return result
def load_pl_ner_replacements_dictionary(
path: str, ner_mapping: Optional[Dict[str, Type[Detection]]] = None
) -> Dict[str, Dict[str, Dict[str, str]]]:
"""
Loads a dictionary that maps named entity tags to lemmas to part-of-speech tags to words.
The dictionary is a nested defaultdict, so if a key is not found, an empty defaultdict is returned.
The dictionary is stored in a tab-separated file, where each line has the following format:
<ner_tag> <word> <lemma> <pos_tag>
Example:
OSOBA Andrzejowi Andrzej subst:sg:dat:m1
OSOBA Andrzej Andrzej subst:sg:m1:imperf
OSOBA Kasia Kasia subst:sg:f:imperf
MIEJSCE Wrocław Wrocław subst:sg:m2:imperf
MIEJSCE Warszawa Warszawa subst:sg:f:imperf
MIEJSCE Kraków Kraków subst:sg:m2:imperf
Parameters
----------
path : str
Path to the dictionary file.
Returns
-------
Dict[str, Dict[str, Dict[str, str]]]
Nested defaultdict that maps named entity tags to lemmas to part-of-speech tags to words.
"""
replacement_dictionary = defaultdict(lambda: defaultdict(dict))
with open(path, "r", encoding="utf-8") as file:
for line in file:
line = line.strip()
ner_tag, word, lemma, morpho_tag = line.split("\t")
if ner_mapping is not None:
ner_tag = ner_mapping.get(ner_tag, OtherDetection)
replacement_dictionary[ner_tag][morpho_tag][lemma] = word
return replacement_dictionary
......@@ -6,24 +6,23 @@ from src.input_parsers.interface import InputParser
from src.annotations import Annotation, MorphosyntacticAnnotation, NerAnnotation
class CCLInputParser(InputParser):
def __init__(self) -> None:
def __init__(
self,
) -> None:
super().__init__()
def parse(self, path_to_input: str) -> List[Tuple[int, int, Annotation]]:
def parse(self, content: str) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
"""Parse CCL string into text and annotations.
Annotations are returned as a dictionary with channel name as a key and list of tuples.
Args:
path_to_input (str): Path to file containing CCL.
content (str): Content of ccl file.
Returns:
Tuple[str, Dict[str, List[Tuple[int, int, Annotation]]]]: Text and annotations.
"""
with open(path_to_input, 'r') as f:
ccl = f.read()
ccl_tree = etree.fromstring(ccl.strip().encode('utf-8'))
ccl_tree = etree.fromstring(content.strip().encode('utf-8'))
results = []
text = ""
......
from typing import Dict, List, Tuple, Any
class InputParser:
def parse(self, path_to_input: str) -> Tuple[str, Dict[str, List[Tuple[int, int, Any]]]]:
def parse(self, content: str) -> Tuple[str, List[Tuple[int, int, Any]]]:
"""Parse input string into text and annotations.
Annotations are returned as a dictionary with channel name as a key and list of tuples.
Eg.: "She has a cat" -> ("She has a cat", {"entities": [(0, 3, "She"), (8, 11, "cat")]})
Args:
path_to_input (str): Path to file containing input.
content (str): Input in raw form.
Returns:
Tuple[str, Dict[str, List[Tuple[int, int, Any]]]]: Text and annotations.
......
from typing import Dict, List, Tuple
from lxml import etree
import json
from collections import defaultdict
# from src.annotation_types_old import
from src.input_parsers.interface import InputParser
from src.annotations import Annotation, MorphosyntacticAnnotation, NerAnnotation
class WiktorNERInputParser(InputParser):
def __init__(self) -> None:
super().__init__()
def parse(self, content: str) -> Tuple[str, List[Tuple[int, int, Annotation]]]:
"""Parse wiktorner file into text and annotations.
Annotations are returned as a dictionary with channel name as a key and list of tuples.
Args:
co z (str): Path to file containing CCL.
Returns:
Tuple[str, List[Tuple[int, int, Annotation]]]: Text and annotations.
"""
content_parsed = json.loads(content)
if "text" in content_parsed:
text = content_parsed['text']
else:
text = ""
annotations = []
# Morphosyntactic annotations
if "tokens" in content_parsed:
for token in content_parsed['tokens']:
if "position" in token:
token_start, token_end = token['position']
if "lexemes" in token:
for lexeme in token['lexemes']:
if "disamb" in lexeme and lexeme['disamb'] == True:
if "mstag" in lexeme:
annotations.append((token_start, token_end, MorphosyntacticAnnotation(lexeme['mstag'])))
# NER annotations
if "entities" in content_parsed:
for entity in content_parsed['entities']:
if "positions" in entity:
entity_start, entity_end = entity['positions']
if "type" in entity:
annotations.append((entity_start, entity_end, NerAnnotation(entity['type'])))
return text, annotations
\ No newline at end of file
from src.detections import (
NameDetection,
SurnameDetection,
StreetNameDetection,
CityDetection,
CountryDetection,
)
NER_PL_N5_MAPPING = {
"person_first_nam": NameDetection,
"person_last_nam": SurnameDetection,
"road_nam": StreetNameDetection,
"city_nam": CityDetection,
"country_nam": CountryDetection,
}
......@@ -20,7 +20,9 @@ class DefaultPipeline(Pipeline):
self._replacers = replacers
def run(self, input) -> str:
parsed_input = self._input_parser.parse(input)
with open(input, 'r') as f:
content = f.read()
parsed_input = self._input_parser.parse(content)
detected_entities = []
for detector_name, detector in self._detectors.items():
......
from src.pipeline.interface import Pipeline
from typing import Dict
from src.suppressors.interface import Suppressor
from src.detectors.interface import Detector
from src.replacers.interface import ReplacerInterface
from src.input_parsers.interface import InputParser
import json
class SequentialJSONLPipeline(Pipeline):
def __init__(
self,
input_parser: InputParser,
detectors: Dict[str, Detector],
suppressor: Suppressor,
replacers: Dict[str, ReplacerInterface],
):
self._input_parser = input_parser
self._detectors = detectors
self._suppressor = suppressor
self._replacers = replacers
def run(self, input) -> str:
result = []
with open(input, 'r') as f:
for line in f.readlines():
if line.strip() == "":
continue
parsed_input = self._input_parser.parse(line)
detected_entities = []
for detector_name, detector in self._detectors.items():
detected_entities += detector.detect(parsed_input[0], parsed_input[1])
annotaitons_cleaned = self._suppressor.suppress(detected_entities)
replaced_input = parsed_input[0]
annotations_left = annotaitons_cleaned
for replacer_name, replacer in self._replacers.items():
replaced_input, annotations_left = replacer.replace(
replaced_input, annotations_left
)
result.append({"text": replaced_input})
return "\n".join([json.dumps(item, ensure_ascii=False) for item in result])
def get_sublcasses(cls):
subclasses = []
for subclass in cls.__subclasses__():
subclasses.append(subclass)
subclasses.extend(get_sublcasses(subclass))
return subclasses
\ No newline at end of file
......@@ -2,8 +2,12 @@ from src.annotations import NerAnnotation, MorphosyntacticAnnotation
from src.detections import NameDetection, SurnameDetection, CityDetection
from src.detectors.ner import NerDetector
def test_detect_names_pl_liner_n5():
detector = NerDetector("pl")
def test_ner_detector():
detector = NerDetector(detection_mapping={
"person_first_nam": "name",
"person_last_nam": "surname",
"city_nam": "city",
})
annotations = [
(10, 16, NerAnnotation("person_first_nam")),
......
from src.dictionaries.morphosyntactic.ner_file import NERFileMorphosyntacticDictionary
from src.detections import NameDetection, CityDetection, SurnameDetection
from tempfile import NamedTemporaryFile
def test_ner_file_morphosyntactic_dictionary():
with NamedTemporaryFile() as file:
file.writelines([
"name Andrzejowi Andrzej a\n".encode("utf-8"),
"name Andrzej Andrzej b\n".encode("utf-8"),
"name Kasia Kasia c\n".encode("utf-8"),
"city Wrocław Wrocław d\n".encode("utf-8"),
"city Warszawa Warszawa c\n".encode("utf-8"),
"city Kraków Kraków d\n".encode("utf-8")
])
file.flush()
dictionary = NERFileMorphosyntacticDictionary(file.name)
example_name_1 = NameDetection(morpho_tag="a")
example_name_2 = NameDetection(morpho_tag="b")
example_other = SurnameDetection(morpho_tag="c")
assert dictionary.get_random_replacement(example_name_1) == "Andrzejowi"
assert dictionary.get_random_replacement(example_name_2) == "Andrzej"
# If no good replacement is present, it should choose a random one
assert dictionary.get_random_replacement(example_other) in ["Andrzejowi" ,"Andrzej", "Kasia", "Wrocław", "Warszawa", "Kraków"]
\ No newline at end of file
from src.dictionaries.morphosyntactic.ner_file_nkjp import NERFileNKJPMorphosyntacticDictionary
from src.detections import NameDetection, CityDetection, SurnameDetection
from tempfile import NamedTemporaryFile
def test_ner_file_nkjp_morphosyntactic_dictionary():
with NamedTemporaryFile() as file:
file.writelines([
"name Andrzejowi Andrzej sg:dat:m1\n".encode("utf-8"),
"name Andrzej Andrzej sg:m1:imperf\n".encode("utf-8"),
"name Kasia Kasia sg:f:imperf\n".encode("utf-8"),
"city Wrocław Wrocław sg:m2:imperf\n".encode("utf-8"),
"city Warszawa Warszawa sg:f:imperf\n".encode("utf-8"),
"city Kraków Kraków sg:m2:imperf\n".encode("utf-8")
])
file.flush()
dictionary = NERFileNKJPMorphosyntacticDictionary(file.name)
example_name_1 = NameDetection(morpho_tag="subst:sg:dat:m1")
example_name_2 = NameDetection(morpho_tag="subst:sg:m1:imperf")
example_other = SurnameDetection(morpho_tag="subst:sg:m1:imperf")
assert dictionary.get_random_replacement(example_name_1) == "Andrzejowi"
assert dictionary.get_random_replacement(example_name_2) in ["Andrzej", "Kasia"]
# If no good replacement is present, it should choose a random one
assert dictionary.get_random_replacement(example_other) in ["Andrzejowi" ,"Andrzej", "Kasia", "Wrocław", "Warszawa", "Kraków"]
\ No newline at end of file
from src.dictionaries.morphosyntactic.pl_ner import PlNERMorphosyntacticDictionary
from src.detections import NameDetection, CityDetection, SurnameDetection
def test_pl_ner_morphosyntactic_dictionary():
dictionary = PlNERMorphosyntacticDictionary(list=[
(NameDetection, "Andrzejowi", "Andrzej", "subst:sg:dat:m1"),
(NameDetection, "Andrzej", "Andrzej", "subst:sg:m1:imperf"),
(NameDetection, "Kasia", "Kasia", "subst:sg:f:imperf"),
(CityDetection, "Wrocław", "Wrocław", "subst:sg:m2:imperf"),
(CityDetection, "Warszawa", "Warszawa", "subst:sg:f:imperf"),
(CityDetection, "Kraków", "Kraków", "subst:sg:m2:imperf")
])
example_name_1 = NameDetection(morpho_tag="subst:sg:dat:m1")
example_name_2 = NameDetection(morpho_tag="subst:sg:m1:imperf")
example_other = SurnameDetection(morpho_tag="subst:sg:m1:imperf")
assert dictionary.get_random_replacement(example_name_1) == "Andrzejowi"
assert dictionary.get_random_replacement(example_name_2) in ["Andrzej", "Kasia"]
# I no good replacement is present, it should choose a random one
assert dictionary.get_random_replacement(example_other) in ["Andrzejowi" ,"Andrzej", "Kasia", "Wrocław", "Warszawa", "Kraków"]
\ No newline at end of file
......@@ -54,10 +54,7 @@ example_ccl = """<?xml version="1.0" encoding="UTF-8"?>
def test_ccl_input_parser():
parser = CCLInputParser()
with NamedTemporaryFile() as f:
f.write(example_ccl.encode("utf-8"))
f.flush()
text, annotations = parser.parse(f.name)
text, annotations = parser.parse(example_ccl)
assert text == "Marek Kowalski pojechał do Wrocławia."
assert len(annotations) == 8
......
# from src.annotation_types_old import AnnotationTypes
from src.input_parsers.wiktor_ner import WiktorNERInputParser
from src.annotations import NerAnnotation, MorphosyntacticAnnotation
example_json = """{
"filename": "test_filename",
"text": "Marek Kowalski pojechał do Wrocławia.",
"tokens": [
{
"index": 1,
"position": [0,5],
"orth": "Marek",
"lexemes": [
{
"lemma": "Marek",
"mstag": "subst:sg:nom:m1",
"disamb": true
}
]
},
{
"index": 2,
"position": [6,14],
"orth": "Kowalski",
"lexemes": [
{
"lemma": "Kowalski",
"mstag": "subst:sg:nom:m1",
"disamb": true
}
]
},
{
"index": 3,
"position": [15,23],
"orth": "pojechał",
"lexemes": [
{
"lemma": "pojechać",
"mstag": "praet:sg:m1:perf",
"disamb": true
}
]
},
{
"index": 4,
"position": [24,26],
"orth": "do",
"lexemes": [
{
"lemma": "do",
"mstag": "prep:gen",
"disamb": true
}
]
},
{
"index": 5,
"position": [27,36],
"orth": "Wrocławia",
"lexemes": [
{
"lemma": "Wrocław",
"mstag": "subst:sg:gen:m3",
"disamb": true
}
]
},
{
"index": 6,
"position": [36,37],
"orth": ".",
"lexemes": [
{
"lemma": ".",
"mstag": "interp",
"disamb": true
}
]
}
],
"entities": [
{
"text": "Marek Kowalski",
"type": "nam_liv",
"tokens": [0, 2],
"positions": [0, 14]
},
{
"text": "Wrocławia",
"type": "nam_loc",
"tokens": [4, 5],
"positions": [27, 36]
}
]
}"""
def test_wiktor_ner_input_parser():
parser = WiktorNERInputParser()
text, annotations = parser.parse(example_json)
assert text == "Marek Kowalski pojechał do Wrocławia."
assert len(annotations) == 8
assert (0, 14, NerAnnotation("nam_liv")) in annotations
assert (27, 36, NerAnnotation("nam_loc")) in annotations
assert (0, 5, MorphosyntacticAnnotation("subst:sg:nom:m1")) in annotations
assert (6, 14, MorphosyntacticAnnotation("subst:sg:nom:m1")) in annotations
assert (15, 23, MorphosyntacticAnnotation("praet:sg:m1:perf")) in annotations
assert (24, 26, MorphosyntacticAnnotation("prep:gen")) in annotations
assert (27, 36, MorphosyntacticAnnotation("subst:sg:gen:m3")) in annotations
assert (36, 37, MorphosyntacticAnnotation("interp")) in annotations
\ No newline at end of file
......@@ -4,9 +4,10 @@ from src.input_parsers.interface import InputParser
from src.detectors.interface import Detector
from src.suppressors.interface import Suppressor
from src.replacers.interface import ReplacerInterface
from tempfile import NamedTemporaryFile
class MockInputParser(InputParser):
def parse(self, input):
def parse(self, content):
return "ala ma kota", {}
class MockDetector(Detector):
......@@ -30,4 +31,5 @@ def test_default_pipeline():
{"mock_replacer": MockReplacer()}
)
assert pipeline.run("/test.txt") == "zbigniew ma kota"
\ No newline at end of file
with NamedTemporaryFile() as f:
assert pipeline.run(f.name) == "zbigniew ma kota"
\ No newline at end of file
from src.pipeline.sequential_jsonl import SequentialJSONLPipeline
from src.detections import NameDetection
from src.input_parsers.interface import InputParser
from src.detectors.interface import Detector
from src.suppressors.interface import Suppressor
from src.replacers.interface import ReplacerInterface
from tempfile import NamedTemporaryFile
class MockInputParser(InputParser):
def parse(self, content):
return "ala ma kota", {}
class MockDetector(Detector):
def detect(self, text, annotations):
return [(0, 3, NameDetection())]
class MockSuppressor(Suppressor):
def suppress(self, annotations):
return annotations
class MockReplacer(ReplacerInterface):
def replace(self, text, annotations):
return "zbigniew ma kota", annotations
def test_sequential_jsonl_pipeline():
# TODO: Prepare mocks that will better test the pipeline
pipeline = SequentialJSONLPipeline(
MockInputParser(),
{"mock_detector": MockDetector()},
MockSuppressor(),
{"mock_replacer": MockReplacer()}
)
with NamedTemporaryFile() as f:
f.write(b'{"text": "ala ma kota"}\n{"text": "ala ma kota"}')
f.flush()
result = pipeline.run(f.name)
assert result == '{"text": "zbigniew ma kota"}\n{"text": "zbigniew ma kota"}'
\ No newline at end of file
from src.replacers.ner_replacer import NERReplacer
from src.detections import NameDetection, SurnameDetection, DateDetection, CityDetection
from src.dictionaries.morphosyntactic.pl_ner import PlNERMorphosyntacticDictionary
from src.dictionaries.morphosyntactic.ner_file import NERFileMorphosyntacticDictionary
from tempfile import NamedTemporaryFile
def test_ner_replacer():
dictionary = PlNERMorphosyntacticDictionary(list=[
(NameDetection, "Andrzej", "Andrzej", "a"),
(NameDetection, "Kasi", "Kasia", "b"),
(SurnameDetection, "Kowalowi", "Kowal", "a"),
(SurnameDetection, "Kowal", "Kowal", "b"),
], always_replace=False)
with NamedTemporaryFile() as file:
file.writelines([
"name Andrzej Andrzej a\n".encode("utf-8"),
"name Kasi Kasia b\n".encode("utf-8"),
"surname Kowalowi Kowal a\n".encode("utf-8"),
"surname Kowal Kowal b\n".encode("utf-8"),
])
file.flush()
dictionary = NERFileMorphosyntacticDictionary(file.name, always_replace=False)
text = "Ala Brzeszczot urodziła sie 05.05.2005 we Wrocławiu"
detections = [
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment