Commit 56487d30 authored by Arkadiusz Janz's avatar Arkadiusz Janz

a new package for CCL document manipulation - it's a convenient wrapper for corpus2

parent cce77df0
from ._base import *
""" A set of methods for operating on ccl files, especially for reading and
writing documents.
"""
import os
import corpus2
ENCODING = "utf-8"
__all__ = [
'read',
'write',
'read_from_directory',
'get_tagset',
]
def _read(tagset, ccl_path, rel_path=None):
""" A standard way to read CCL using corpus2. """
reader = corpus2.CclRelReader(
get_tagset(tagset),
ccl_path,
rel_path if rel_path else ccl_path
)
document = reader.read()
del reader
return document
def read(ccl_path, rel_path=None, tagset='nkjp'):
""" Read the given ccl file and return corpus2.Document.
Notes:
Additionally, a rel-ccl file can be provided. Information about relat-
ions is attached to the document object.
Args:
tagset: the name of the tagset that is used in the document or a tagset
object itself.
ccl_path: a path to CCL file
rel_path: a path to REL file.
Returns:
corpus2.Document: The read document.
"""
if rel_path:
return _read(tagset, ccl_path, rel_path)
return _read(tagset, ccl_path)
def read_from_directory(path, ccl_ext='.xml', rel_ext='.rel.xml',
read_rel_files=False, tagset='nkjp',
rel_required=True):
""" Read CCL files from the given directory. Returns a generator of
corpus2.Document objects .
Notes:
Additionally, REL files can be attached by searching for them in the
same directory. To use this function set the `read_rel_files` parame
ter to True. The information about relations is added to document
object if the corresponding REL file was found.
If `rel_required` parameter is set to True, but the function could not
find a corresponding REL file, then we skip both CCL and REL files. If
`rel_required` is set to False, then the function just ignores missing
REL files, but reads CCLs. We do not search the directory recursively.
The function returns a generator of documents, because reading long
documents may take some time (and memory space!).
Args:
path: a path to a directory we want to scan.
ccl_ext: we can change file extension of our CCL files if they have
different extensions than default (the default is .xml).
rel_ext: we can change file extension of our REL files if they have
different extensions (the default is set to .rel.xml).
red_rel_files: read REL files - True, ignore REL file - False.
rel_required: if reading REL files is necessary then set this pa-
rameter to True. This will force the function to read CCL files
only if a corresponding REL file exists (and skips CCLs if RELs
do not exist.).
tagset: str|corpus2.Tagset.
Returns:
a generator of corpus2.Document objects
"""
if not os.path.isdir(path):
raise TypeError("{:} must be a directory".format(path))
files = set([os.path.join(path, file_)
for file_ in os.listdir(path)])
ccl_files = (file_ for file_ in files
if file_.endswith(ccl_ext) and
not file_.endswith(rel_ext))
for ccl_file in ccl_files:
rel_file = None
if read_rel_files:
rel_file = ccl_file.replace(ccl_ext, rel_ext)
if rel_required:
filepath = os.path.join(path, rel_file)
if not os.path.exists(filepath):
continue
yield _read(tagset, ccl_file, rel_file)
def _write(document, tagset, ccl_path, rel_path=None):
""" A standard way to save CCL files. """
writer = corpus2.TokenWriter.create_path_writer(
'ccl', ccl_path, get_tagset(tagset))
for chunk in document.paragraphs():
writer.write_chunk(chunk)
writer.finish()
if rel_path:
writer = corpus2.RelationWriter(rel_path)
writer.write(document.relations())
del writer
def write(document, ccl_path, rel_path=None, tagset='nkjp'):
""" Write the document object to the output ccl file.
Notes:
We save the relations from the document if the output path for REL file
is provided.
Args:
document: corpus2.Document object - the document to be saved.
tagset: the name of the tagset that is used in the document or a tagset
object itself.
ccl_path: a path for output CCL file to save the document.
rel_path: a path to REL file to save document relations.
"""
_write(document, tagset, ccl_path, rel_path)
def get_tagset(tagset):
""" Returns a tagset object.
Notes:
Its a wrapper function that creates a tagset object if the input value
is string. Otherwise it returns what it got.
Args:
tagset: a name of a tagset or a tagset object itself.
Returns:
a corpus2 tagset object.
"""
if isinstance(tagset, str):
tagset = corpus2.get_named_tagset(tagset)
return tagset
""" Helper functions for manipulating token attributes and annotations. """
from builtins import dict
from corpus2 import AnnotatedSentence_wrap_sentence as annotate_sentence
class _RaiseClass(object):
def __repr__(self):
return "<RAISE>"
RAISE = _RaiseClass()
def get_attributes(token, to_unicode=False):
"""
Get attributes of a token.
If token has no metadata, safely returns empty dict.
Args:
token (Corpus2.token)
to_unicode (:obj:`bool`, optional): Cast keys and values to unicode.
(Default value = False)
Returns:
dict
"""
if not token.has_metadata():
return {}
metadata = token.get_metadata()
attributes = dict(metadata.attributes())
if to_unicode:
attributes = {_try_decode(key): _try_decode(value)
for (key, value) in list(attributes.items())}
return attributes
def get_attribute(token, key, default=RAISE, to_unicode=False):
"""
Get named attribute of a token.
If token has no metadata, attribute is treated as not existing.
Args:
token (Corpus2.token)
key (object): Attribute name, automatically casted to string.
default (:obj:`object`, optional): If given, and key not found,
returns this value instead. Raises KeyError otherwise.
to_unicode (:obj:`bool`, optional): Cast value to unicode.
(Default value = False)
Returns:
str
"""
attributes = get_attributes(token, to_unicode)
if to_unicode:
key = _try_decode(key)
if default is not RAISE:
return attributes.get(key, default)
return attributes[key]
def set_attribute(token, key, value):
"""
Set attribute of a token.
If token has no metadata, it is created automatically.
Args:
token (Corpus2.token)
key (object): Attribute name, automatically casted to string.
value (object): Attribute name, automatically casted to string.
"""
if not token.has_metadata():
token.create_metadata()
metadata = token.get_metadata()
metadata.set_attribute(_try_encode(key), _try_encode(value))
def set_attributes(token, items):
"""
Set attribute of a token.
If token has no metadata, it is created automatically.
Args:
token (Corpus2.token)
items (Mapping[object, object]): Dictionary with keys and values,
automatically casted to string.
"""
for (key, value) in list(items.items()):
set_attribute(token, key, value)
def get_annotations(sentence, token, tok_in_sent_index=None):
"""
Get annotations of a token from sentence annotation channel.
Args:
sentence (Corpus2.sentence)
token (Corpus2.token)
tok_in_sent_index (int): Position of a token in a sentence.
If present, then additional operations
related with finding token's index won't be
performed. If such index is known then it's
recommended to provide it to speed up
execution.
Returns:
Dict[str, int]
"""
try:
sentence.all_channels()
except AttributeError:
sentence = annotate_sentence(sentence)
if tok_in_sent_index:
index = tok_in_sent_index
else:
index = _find_token(sentence, token)
# Using dict causes invalid reference, need to retrieve channel anyways
channels = list(sentence.all_channels())
return {
name: sentence.get_channel(name).get_segment_at(index)
for name in channels
}
def _find_token(sentence, token):
for (index, token_in_sentence) in enumerate(sentence.tokens()):
if token_in_sentence.is_same(token):
return index
raise ValueError("Token does not belong to sentence.")
def get_annotation(sentence, token, key, tok_in_sent_index=None,
default=RAISE):
"""
Get named annotation of a token from sentence annotation channel.
Args:
sentence (Corpus2.sentence)
token (Corpus2.token)
key (str)
tok_in_sent_index (int): Position of token in sentence.
If present, then additional operations
related with finding token's index won't be
performed. If such index is known then it's
recommended to provide it to speed up
execution.
default (:obj:`object`, optional): If given, and key not found,
returns this value instead. Raises KeyError otherwise.
Returns:
int
"""
annotations = get_annotations(
sentence, token, tok_in_sent_index=tok_in_sent_index)
if default is not RAISE:
return annotations.get(key, default)
return annotations[key]
def set_annotation_for_token(sentence, token, key, value=None, set_head=False):
"""
Set annotation for a token.
Args:
sentence (Corpus2.Sentence)
token (Corpus2.Token)
key (str): a name for annotation channel
value (int, bool): annotation number (convertible to integer)
"""
ann_sentence = annotate_sentence(sentence)
if key not in ann_sentence.all_channels():
ann_sentence.create_channel(key)
channel = ann_sentence.get_channel(key)
token_index = _find_token(sentence, token)
if value:
try:
segment = int(value)
except TypeError:
raise Exception("Wrong value type - should be an integer.")
else:
segment = channel.get_new_segment_index()
channel.set_segment_at(token_index, segment)
if set_head:
channel.set_head_at(token_index, True)
def is_head_of(sentence, token, key):
ann_sentence = annotate_sentence(sentence)
if key not in ann_sentence.all_channels():
raise Exception("Channel not found!")
channel = ann_sentence.get_channel(key)
token_index = _find_token(sentence, token)
return channel.is_head_at(token_index)
def _try_decode(value):
try:
value = str(value)
except UnicodeEncodeError:
pass
try:
value = value.decode("utf-8")
except (UnicodeDecodeError, AttributeError):
pass
return value
def _try_encode(value):
try:
value = str(value)
except UnicodeEncodeError:
pass
try:
value = value.encode("utf-8")
except (UnicodeEncodeError, AttributeError):
pass
return value
""" A set of methods for operating on ccl files, especially for reading and
writing documents.
"""
import corpus2
ENCODING = "utf-8"
__all__ = [
'copy_chunk',
'copy_sentence',
'copy_relation'
# todo: add 'copy_token' function
]
def _new_relation_point(relation_id, channel_name, annotation_number):
return corpus2.DirectionPoint(
relation_id, channel_name, annotation_number
)
def _change_point_id(point, point_id):
return _new_relation_point(
point_id, point.channel_name(), point.annotation_number()
)
def copy_relation(relation, new_from_id=None, new_to_id=None):
""" Returns a copy of given relation object and changes its identifiers if
necessary. When no optional parameters are given we just copy the relation
without changing its properties.
Notes:
If `new_from_id` is given then the sentence identifier of `rel_from`
object will be replaced. The same holds for `new_to_id` and `rel_to`
object. If only one of them is provided then the second one will be
a direct copy of the original sentence identifier.
Args:
relation: the relation object to make a copy.
new_from_id: the new identifier for the from-point of the relation.
new_to_id: the new identifier for the to-point of the relation.
Returns:
a new relation copy (corpus2.RelationPtr)
"""
relation_copy = relation.clone_shared()
if new_from_id:
point = _change_point_id(new_from_id, relation_copy.rel_from())
relation_copy.set_from(point)
if new_to_id:
point = _change_point_id(new_to_id, relation_copy.rel_to())
relation_copy.set_to(point)
return relation_copy
def copy_sentence(sentence, new_id=None):
""" Returns a copy of the given sentence and changes its identifier if
necessary. If the `new_id` is provided then the original identifier of
source sentence is replaced.
Args:
sentence: a sentence object to copy.
new_id: a new identifier for our sentence copy.
Returns:
a copy of the given sentence (corpus2.SentencePtr)
"""
sentence_copy = sentence.clone_shared()
if new_id:
sentence_copy.set_id(new_id)
return sentence_copy
def copy_chunk(chunk, copy_sentences=True, new_id=None):
""" Returns a copy of the given chunk and changes its identifier if
necessary.
Notes:
If `copy_sentences` is set to False then the sentences from source
chunk WON'T be copied, only the attributes. If `new_id` is provided
then the identifier of source chunk will be replaced.
Args:
chunk: a chunk to copy.
copy_sentences: the copy won't copy source sentences if set to
False.
new_id: the new identifier for a copy of the chunk.
Returns:
a new copy of source chunk (corpus2.Chunk).
"""
if not copy_sentences:
new_chunk = corpus2.Chunk().clone_shared()
_copy_chunk_attributes(chunk, new_chunk)
else:
new_chunk = chunk.clone_shared()
if new_id:
new_chunk.set_attribute('id', new_id)
return new_chunk
def _copy_chunk_attributes(source_chunk, target_chunk):
""" Copy all attributes from the source chunk to the target chunk.
Args:
source_chunk: a source chunk.
target_chunk: a target chunk.
"""
for key, value in list(source_chunk.attributes().items()):
target_chunk.set_attribute(key, value)
# todo: move somewhere else!
def sentence2str(sentence, use_bases=False, tagset='nkjp'):
""" Return corpus2.Sentence as a string.
Args:
sentence: a sentence object (corpus2.Sentence).
use_bases: if set to True, the we take base forms
instead of taking the orths.
Returns:
a string representation of the input sentence object.
"""
if isinstance(tagset, str):
tagset = corpus2.get_named_tagset(tagset)
text = []
for token in sentence.tokens():
text.append(" " if token.after_space() else "")
if not use_bases:
token_string = token.orth_utf8()
else:
token_string = token.get_preferred_lexeme(tagset).lemma_utf8()
text.append(token_string)
return "".join(text).strip()
"""
Set of helper methods for creating corpus2 Token, Lexeme and Tag objects.
"""
import corpus2
from cclutils import get_tagset
SIMPLE_TAGSET = get_tagset("simple")
def get_lexeme_strings(document, tagset, delimiter=":", include_fine=False,
lemma_only=False):
"""
Get lexeme strings from the corpus.
Args:
document: (corpus2.Document)
Returns:
List[str]: List of string represenations of tokens consistent
with include_fine and lemma_only constructor options.
"""
lexemes = (token.get_preferred_lexeme(tagset)
for paragraph in document.paragraphs()
for sentence in paragraph.sentences()
for token in sentence.tokens())
lexemes = (get_coarse_lexeme_pair(lexeme, tagset)
for lexeme in lexemes)
if not include_fine:
lexemes = [pos_lex for pos_lex in lexemes
if pos_lex[0] in ["verb", "noun", "adj", "adv"]]
lexemes = [join_lexeme(pos_lex, delimiter, lemma_only)
for pos_lex in lexemes]
return lexemes
def create_token_split(string, delimiter=":", tagset=SIMPLE_TAGSET):
"""
Create Token object with single Lexeme from single string
with part of speech and lemma separated by delimiter.
Args:
string (str): String of form "{pos}{delimiter}{lemma}".
delimiter (:obj:`str`, optional): Delimiter used for splitting
part of speech and lemma. CAN appear further down in lemma
but NOT in part of speech. Defaults to ":".
tagset (:obj:`corpus2.Tagset`, optional): Tagset for decoding
the string. Defaults to corpus2.get_named_tagset("simple").
Returns:
corpus2.Token: Token object with single Lexeme with given pos and lemma.
"""
(pos, lemma) = split_lexeme(string, delimiter)
return create_token(pos, lemma, tagset)
def create_token(pos, lemma, tagset=SIMPLE_TAGSET):
"""
Create Lexeme object from single string with part of speech and lemma
separated by delimiter.
Args:
pos (str): String specifying tagset's part of speech.
lemma (str)
tagset (:obj:`corpus2.Tagset`, optional): Tagset for decoding
the string. Defaults to corpus2.get_named_tagset("simple").
Returns:
corpus2.Token: Lexeme object with given pos and lemma.
"""
lexeme = create_lexeme(pos, lemma, tagset)
if not lexeme:
return None
token = corpus2.Token()
token.add_lexeme(lexeme)
return token
def create_lexeme_split(string, delimiter=":", tagset=SIMPLE_TAGSET):
"""
Create Lexeme object from single string with part of speech and lemma
separated by delimiter.
Args:
string (str): String of form "{pos}{delimiter}{lemma}".
delimiter (:obj:`str`, optional): Delimiter used for splitting
part of speech and lemma. CAN appear further down in lemma
but NOT in part of speech. Defaults to ":".
tagset (:obj:`corpus2.Tagset`, optional): Tagset for decoding
the string. Defaults to corpus2.get_named_tagset("simple").