Skip to content
Snippets Groups Projects
Commit f6bb7711 authored by Adam Pawlaczek's avatar Adam Pawlaczek
Browse files

Added oracle

parent afca5936
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python
#-*- coding: utf-8 -*-
'''
Created on 19-02-2013
@author: jezozwierzak
'''
from optparse import OptionParser
import sys, os
import corpus2
from chunker_scripts import tools
descr = """%prog [options] [in_dir] [out_dir]
in_dir has to contain subdirs with folds chunked by individual chunkers.
Subdir should be named as chunker which chunked files in it.
"""
def go():
parser = OptionParser(usage=descr)
parser.add_option('-i', '--input-format', type='string', action='store',
dest='input_format', default='ccl',
help='set the input format; default: ccl')
parser.add_option('-o', '--output-format', type='string', action='store',
dest='output_format', default='ccl',
help='set the output format; default: ccl')
parser.add_option('-c', '--chunk-names', type='string', action='store',
dest='chunk_names', default='',
help='set chunk_names to eval')
parser.add_option('--chunkers', type='string', action='store',
dest='chunkers', default='',
help='set chunkers to eval')
parser.add_option('-f', '--folds', type="int", action='store',
dest='folds', default=1,
help='Number of folds')
parser.add_option('-t', '--tagset', type='string', action='store',
dest='tagset', default='nkjp',
help='set the tagset used in input; default: nkjp')
(options, args) = parser.parse_args()
if len(args) != 3 and options.chunk_names == '' and options.chunkers == '':
sys.stderr.write('You need to provide a in_dir, ref_dir and out_dir and chunk_names and chunkers.\n')
sys.stderr.write('See %s --help\n' % sys.argv[0])
sys.exit(1)
in_path, ref_path, out_path = args
main(in_path, ref_path, out_path, options.input_format, options.output_format,
options.chunk_names, options.chunkers, options.folds, options.tagset)
def get_ref_paths(in_path, folds, input_format):
input_paths = []
if folds > 1:
for fold in range(1, folds+1):
if input_format == "ccl":
input_paths.append(os.path.join(in_path, 'ccl-test' + str(fold).zfill(2) + '.xml'))
elif input_format == "xces":
input_paths.append(os.path.join(in_path, 'test' + str(fold).zfill(2) + '.xml'))
else:
if(os.path.isdir(in_path)):
for (path, dirs, files) in os.walk(in_path):
for file in files:
input_paths.append(os.path.join(path, file))
else:
input_paths.append(in_path)
return input_paths
def get_input_paths(in_path, folds, input_format, chunkers):
input_paths = []
for fold in range(1, folds+1):
fold_inputs = {}
for chunker in chunkers:
if os.path.isdir(os.path.join(in_path, chunker)):
if input_format == "ccl":
fold_inputs[chunker] = os.path.join(in_path, chunker, 'ccl-test' + str(fold).zfill(2) + '.xml')
elif input_format == "xces":
fold_inputs[chunker] = os.path.join(in_path, chunker, 'test' + str(fold).zfill(2) + '.xml')
else:
print os.path.join(in_path, chunker), " dir doesn't exist"
input_paths.append(fold_inputs)
return input_paths
def get_writer(out_path, output_format, tagset, fold):
out_path = get_output_path(out_path, fold, output_format)
return corpus2.TokenWriter.create_path_writer(output_format, out_path,
tagset)
def get_output_path(out_path, fold, output_format):
if output_format == "ccl":
return os.path.join(out_path, 'ccl-test' + str(fold).zfill(2) + '.xml')
elif input_format == "xces":
return os.path.join(out_path, 'test' + str(fold).zfill(2) + '.xml')
def get_readers(in_paths, input_format, tagset):
readers = {}
for chunker, in_path in in_paths.iteritems():
readers[chunker] = tools.get_reader(in_path, input_format, tagset)
return readers
def get_next_sents(readers):
result = {}
for chunker, reader in readers.iteritems():
result[chunker] = reader.get_next_sentence()
return result
def clone_sent(sent):
new_sent = corpus2.Sentence.create_sent(sent.id())
for tok_idx, tok in enumerate(sent.tokens()):
tok = sent.tokens()[tok_idx]
if any(lex.is_disamb() for lex in tok.lexemes()):
new_sent.append(tok.clone())
return new_sent
def main(in_path, ref_path, out_path, input_format, output_format, chunk_names, chunkers, folds, tagset):
tagset = corpus2.get_named_tagset(tagset)
chunk_names = chunk_names.split(",")
chunkers = chunkers.split(",")
ref_paths = get_ref_paths(ref_path, folds, input_format)
input_paths = get_input_paths(in_path, folds, input_format, chunkers)
for fold in range(1, folds+1):
writer = get_writer(out_path, output_format, tagset, fold)
readers = get_readers(input_paths[fold-1], input_format, tagset)
sents = get_next_sents(readers)
ref_reader = tools.get_reader(ref_paths[fold-1], input_format, tagset)
while sents.itervalues().next():
ref_sent = ref_reader.get_next_sentence()
ref_asent = corpus2.AnnotatedSentence.wrap_sentence(ref_sent)
result_sent = clone_sent(ref_asent)
result_asent = corpus2.AnnotatedSentence.wrap_sentence(result_sent)
for chunk_name in ref_asent.all_channels():
if chunk_name in chunk_names:
right_annots = []
ref_annots = ref_asent.get_channel(chunk_name).make_annotation_vector()
ref = dict([(min(ann.indices), ann) for ann in ref_annots])
for chunker in chunkers:
ch_asent = corpus2.AnnotatedSentence.wrap_sentence(sents[chunker])
if ch_asent.has_channel(chunk_name):
ch_annots = ch_asent.get_channel(chunk_name).make_annotation_vector()
ch = dict([(min(ann.indices), ann) for ann in ch_annots])
maybe_hits = set(ch).intersection(ref)
for idx in maybe_hits:
if list(ch[idx].indices) == list(ref[idx].indices) and [i for i in ch[idx].indices] not in right_annots:
right_annots.append([i for i in ch[idx].indices])
#add right chunks
result_asent.create_channel(chunk_name)
chan = result_asent.get_channel(chunk_name)
for ann in right_annots:
seg_no = chan.get_new_segment_index()
for idx in ann:
chan.set_segment_at(idx, seg_no)
result_sent = corpus2.AnnotatedSentence.cast_as_sentence(result_asent)
writer.write_sentence(result_sent)
sents = get_next_sents(readers)
if __name__ == '__main__':
go()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment