Skip to content
Snippets Groups Projects

Plwordnet with basegraph

Files

+ 209
0
"""Implementation of plwordnet_worker."""
import json
import logging
import traceback
import plwn
from basegraph import BaseGraph
_log = logging.getLogger(__name__)
def syn2str(synset):
"""Turns synset into string."""
return synset.lexical_units[0].lemma + ":" + \
str(synset.lexical_units[0].variant)
def tuples2dict(tuples):
"""Turns touples into dictionary."""
nodes_my_dict = dict()
nodes = list()
links = list()
for from_, to in tuples:
if from_ not in nodes_my_dict:
nodes_my_dict[from_] = len(nodes_my_dict)
nodes.append({"id": nodes_my_dict[from_], "name": from_})
if to not in nodes_my_dict:
nodes_my_dict[to] = len(nodes_my_dict)
nodes.append({"id": nodes_my_dict[to], "name": to})
links.append(
{"source_id": nodes_my_dict[from_], "target_id": nodes_my_dict[to]})
return {"nodes": nodes, "links": links}
class PlWordnetService:
""" PLWordnet service
This service provides functions to interact with wordnet database structure.
To provide high performance operations on graph structure are handled by graphtool library.
Required are two models in sync:
1. model for graphtool in form of gz xml file
2. model for sqlite in form of database file
Args:
db_model (str): path to sqlite database file
graph_model (str): path to graphtool model file
"""
def __init__(self, db_model, graph_model):
"""Initializes service."""
_log.info("Initializing models models ...")
self.wn = plwn.load(db_model, "sqlite3")
self.bg = BaseGraph()
self.bg.unpickle(graph_model)
self.bg._generate_lemma_to_nodes_dict()
_log.info("Loading models complete.")
return
def process(self, input):
"""
Executes lex process.
Accepts input as json:
To fetch all
{
"task":"all",
"lexeme":"zdrowie",
"tool":"plwordnet"
}
To list element
{
"function": "list",
"element": {
"val": "dom",
"path": "/",
"lemma": "dom",
"lang": "pl",
"type": "lemma"
}
}
"""
_log.info("Doing work!")
if "function" in input:
res = self._evaluate_function(input["function"], input)
else:
res = self._evaluate_operation(input["task"], input)
_log.info("Work done!")
return res
def _evaluate_operation(self, operation_type, input):
wn = self.wn
if operation_type == "synset":
id = input["id"]
return wn.synset_by_id(id).to_dict()
elif operation_type == "all" or not operation_type:
nodes = self.bg._lemma_to_nodes_dict[input["lexeme"]]
synsets = []
for n in nodes:
synsets.append((wn.synset_by_id(n.synset.synset_id)).to_dict())
result = json.dumps(
{"synsets": synsets,
"href": "http://plwordnet.pwr.wroc.pl/wordnet/"},
ensure_ascii=False)
return result
else:
raise Exception(
"Unsupported task type. Possible tasks: all, synsets")
def _get_lang(self, synset):
lang = "pl"
for unit in synset["units"]:
if len(unit["pos"].split("_")) > 1:
lang = "en"
break
synset["lang"] = lang
return lang
def _filter_by_lang(self, synsets, lang):
synsets = [syn.to_dict() for syn in synsets]
return [x for x in synsets if self._get_lang(x) == lang]
def _get_path_to_highest_hiperonym(self, synset, path=None):
path = path or set()
for _, target in synset.related_pairs('hipo'):
if (syn2str(synset), syn2str(target)) not in path:
path.add((syn2str(synset), syn2str(target)))
path = self._get_path_to_highest_hiperonym(target, path)
return path
def _get_with_relations(self, synset):
res = synset.to_dict()
related = res["related"]
res["path"] = tuples2dict(self._get_path_to_highest_hiperonym(synset))
if related is not None:
for key, value in related.items():
value1 = list(value)
value = []
for el in value1:
value.append(list(el))
related[key] = value
for el in value:
try:
relsynset = self.wn.synset_by_id(int(el[0]))
el.append(relsynset.to_dict())
except Exception:
traceback.print_stack()
return res
def _evaluate_function(self, function_type, input):
response = {}
wn = self.wn
if function_type == "list":
element = input["element"]
url = "http://plwordnet.pwr.wroc.pl/"
if "lemma" in element:
if "lang" not in element or element["lang"] not in ["pl", "en"]:
return response
nodes = self.bg._lemma_to_nodes_dict[element["lemma"].replace(
"_", " ")]
res = []
for n in nodes:
res.append(wn.synset_by_id(n.synset.synset_id))
res = self._filter_by_lang(res, element["lang"])
if len(res) > 0:
formats = ["json"]
url = "http://plwordnet.pwr.wroc.pl/wordnet/lemma/" + \
element["lemma"]
response = {"formats": formats, "url": url}
elif "typeOfSynset" in element and \
"plwordnet" in element["typeOfSynset"]:
try:
res = wn.synset_by_id(int(element["synsetid"]))
formats = ["json"]
url = "http://plwordnet.pwr.wroc.pl/"
response = {"formats": formats, "url": url}
except Exception as ex:
print(ex)
pass
return response
elif function_type == 'get':
element = input["element"]
if "lemma" in element:
if "lang" not in element or element["lang"] not in ["pl", "en"]:
return {}
nodes = self.bg._lemma_to_nodes_dict[element["lemma"].replace(
"_", " ")]
synsets = []
for n in nodes:
synsets.append(wn.synset_by_id(n.synset.synset_id))
synsets = self._filter_by_lang(synsets, element["lang"])
if len(synsets) > 0:
return {'synsets': synsets}
elif "typeOfSynset" in element and \
"plwordnet" in element["typeOfSynset"]:
try:
res = wn.synset_by_id(element["synsetid"])
return self._get_with_relations(res)
except Exception as ex:
print(ex)
pass
return {}
elif function_type == "getInfo":
with open("info.json", "rt", encoding="utf8") as f:
response = json.load(f)
return response
Loading