diff --git a/utils/CSVWriter.py b/utils/CSVWriter.py new file mode 100644 index 0000000000000000000000000000000000000000..deb2e206370974bcfccafc8fa42418c25ccbe4c6 --- /dev/null +++ b/utils/CSVWriter.py @@ -0,0 +1,192 @@ +#!/usr/bin/python +#-*- coding: utf-8 -*- +''' +Created on 03-08-2012 + +@author: jezozwierzak +''' + +class CSVWriter: + + def __init__(self, separator = ';'): + self.widths = [] + self.list = [] + self.rows = 0 + self.columns = 0 + self.separator = separator + + def addSubColumn(self, parentIndex, name): + parentColumn = self.list[parentIndex]; + parentColumn.append([name]) + + subColsStr = '' + for i in range(1,len(parentColumn)): + subColsStr += parentColumn[i][0] + self.separator + + if len(subColsStr) > self.widths[parentIndex]: + self.widths[parentIndex] = len(subColsStr) - 1 + + if len(parentColumn[1:]) > 1: + self.columns+=1 + + def addSubColumnByName(self, parentName, name): + assert self.rows == 0, 'You have to add all Column names before adding rows' + parentIndex = self.columnIndex(parentName) + self.addSubColumn(parentIndex, name) + + def addSubColumnsByName(self, parentName, names=[]): + for name in names: + self.addSubColumnByName(parentName, name) + + def addSubColumns(self, parentIndex, names=[]): + for name in names: + self.addSubColumn(parentIndex, name) + + def addColumn(self, name): + assert self.rows == 0, 'You have to add all Column names before adding rows' + self.list.append([name]) + self.widths.append(len(name)) + self.columns+=1 + + def addColumns(self, names=[]): + for name in names: + self.addColumn(name) + + def addRow(self, row=[]): + assert len(row) == len(self.list), 'Wrong number of columns in row' + + for i in range(0,len(self.list)): + column = self.list[i] + if len(column) > 1 and type(column[1]).__name__ == 'list': + #Adding data to subcolumns + assert len(row[i]) == len(column) - 1, 'Wrong number of subColumns in column ' + column[0] + + for j in range(0,len(row[i])): + column[j+1].append(row[i][j]) + + subColsStr = '' + for j in range(0,len(row[i])): + subColsStr += '{0:{base}}'.format(row[i][j], base='.4f') + self.separator + ' ' + + if len(subColsStr) > self.widths[i]: + self.widths[i] = len(subColsStr) - 1 + else: + #Adding data to column + column.append(row[i]) + + if len(str(row[i])) > self.widths[i]: + self.widths[i] = len(str(row[i])) + + self.rows+=1 + + def allWidth(self): + sum = 0 + for width in self.widths: + sum += width + return width + + def columnIndex(self, name): + for column in self.list: + if column[0] == name: + return self.list.index(column) + + def hasSubColumns(self): + for column in self.list: + if len(column) > 1 and type(column[1]).__name__ == 'list': + return True + return False + + def hasColumnSubColumns(self, index): + column = self.list[index] + return len(column) > 1 and type(column[1]).__name__ == 'list' + + def repeat_to_length(self, string_to_expand, length): + return (string_to_expand * ((length/len(string_to_expand))+1))[:length] + + def count_avg(self, ): + results = [] + if not self.hasSubColumns(): + for i in range(0,len(self.list)): #Iterowanie po kolumnach + results.append(0) + for j in range(1,1+self.rows): # Iterowanie po wierszach + results[i]+= self.list[i][j] + results[i]/=self.rows + else: + for i in range(0,len(self.list)): #Iterowanie po kolumnach + + if self.hasColumnSubColumns(i): + subResults = [] + for k in range(0,len(self.list[i][1:])): + subColumn = self.list[i][1:][k] + subResults.append(0) + for j in range(1,1+self.rows): # Iterowanie po wierszach + subResults[k]+= subColumn[j] + subResults[k]/=self.rows + results.append(subResults) + else: + results.append(0) + for j in range(1,1+self.rows): # Iterowanie po wierszach + results[i]+= self.list[i][j] + results[i]/=self.rows + results = results[1:] + results[:0] = ['AVG'] + self.addRow(results) + + def __str__(self): + result = '' + if not self.hasSubColumns(): + for j in range(0,1+self.rows): # Iterowanie po wierszach + for i in range(0, len(self.list)): #Iterowanie po kolumnach + if type(self.list[i][j]).__name__=='int': + result += '{0:{width}{base}}'.format(self.list[i][j], base='d', width=self.widths[i]) + self.separator + elif type(self.list[i][j]).__name__=='float': + result += '{0:{width}{base}}'.format(self.list[i][j], base='.4f', width=self.widths[i]) + self.separator + else: + result += '{0:{width}{base}}'.format(self.list[i][j], base='s', width=self.widths[i]) + self.separator + result += '\n' + else: + #Printing Thead + thead = zip(*self.list)[0] + for i in range(0,len(thead)): + if self.hasColumnSubColumns(i): + numberOfColumns = len(self.list[i][1:]) + result += '{0:{width}{base}}'.format(thead[i], base='s', width=self.widths[i]-numberOfColumns+1) + self.separator + for j in range(1,numberOfColumns): + result += self.separator + else: + result += '{0:{width}{base}}'.format(thead[i], base='s', width=self.widths[i]) + self.separator + result += '\n' + #Printing subTheads: + for i in range(0,len(self.list)): + if self.hasColumnSubColumns(i): + numberOfColumns = len(self.list[i][1:]) + for subColumn in self.list[i][1:]: + result += '{0:{width}{base}}'.format(subColumn[0], base='s', width=(self.widths[i]/numberOfColumns)) + self.separator + else: + result += '{0:{width}{base}}'.format('', base='s', width=self.widths[i]) + self.separator + result += '\n' + #Printing Data + + for j in range(1,1+self.rows): # Iterowanie po wierszach + for i in range(0, len(self.list)): #Iterowanie po kolumnach + if self.hasColumnSubColumns(i): + + for subcolumns in self.list[i][1:]: + if type(subcolumns[j]).__name__=='int': + result += '{0:{width}{base}}'.format(subcolumns[j], base='d', width=(self.widths[i]/numberOfColumns)) + self.separator + elif type(subcolumns[j]).__name__=='float': + result += '{0:{width}{base}}'.format(subcolumns[j], base='.4f', width=(self.widths[i]/numberOfColumns)) + self.separator + else: + result += '{0:{width}{base}}'.format(subcolumns[j], base='s', width=(self.widths[i]/numberOfColumns)) + self.separator + else: + + if type(self.list[i][j]).__name__=='int': + result += '{0:{width}{base}}'.format(self.list[i][j], base='d', width=self.widths[i]) + self.separator + elif type(self.list[i][j]).__name__=='float': + result += '{0:{width}{base}}'.format(self.list[i][j], base='.4f', width=self.widths[i]) + self.separator + else: + result += '{0:{width}{base}}'.format(self.list[i][j], base='s', width=self.widths[i]) + self.separator + result += '\n' + return result + + diff --git a/utils/chunk_eval.py b/utils/chunk_eval.py index 35e863fbdb713fcd5793d885ffe42ad2232f682f..665d5171819dd646858b2889183c519192791173 100755 --- a/utils/chunk_eval.py +++ b/utils/chunk_eval.py @@ -1,7 +1,9 @@ #!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright (C) 2012 Adam Radziszewski. +#-*- coding: utf-8 -*- +''' +Created on 01-08-2012 +''' +# Copyright (C) 2012 Adam Pawlaczek. # This program is free software; you can redistribute and/or modify it # under the terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 3 of the License, or (at your option) @@ -13,7 +15,7 @@ # # See the LICENCE and COPYING files for more details -descr = """%prog [options] CHUNKED REF CHAN_NAME +descr = """%prog [options] CHUNKED REF Reads the two chunk-annotated corpora: CHUNKED (chunker output) and REF (reference annotation / gold standard). Outputs precision and recall values @@ -24,111 +26,131 @@ for the following settings: NOTE: this script treats discontinuous chunks as whole annotations. """ - - from optparse import OptionParser -import sys import corpus2 +import sys, os +from CSVWriter import CSVWriter class Stats: - def __init__(self): - self.ch_chunks = 0 - self.ref_chunks = 0 - self.chunk_hits = 0 - self.head_hits = 0 - self.both_hits = 0 - - def update(self, ch_annots, ref_annots): - self.ch_chunks += len(ch_annots) - self.ref_chunks += len(ref_annots) - # sort by left border - ch = dict([(min(ann.indices), ann) for ann in ch_annots]) - ref = dict([(min(ann.indices), ann) for ann in ref_annots]) - ch_idx = ref_idx = 0 - maybe_hits = set(ch).intersection(ref) - for idx in maybe_hits: - if list(ch[idx].indices) == list(ref[idx].indices): - self.chunk_hits += 1 - if ch[idx].head_index == ref[idx].head_index: - self.both_hits += 1 - # now compare head indices only - ch = set(ann.head_index for ann in ch_annots) - ref = set(ann.head_index for ann in ref_annots) - self.head_hits += len(ch.intersection(ref)) - - def dump_prf(self, name, hits): - p = 0.0 if self.ch_chunks == 0 else 100.0 * hits / self.ch_chunks - r = 0.0 if self.ref_chunks == 0 else 100.0 * hits / self.ref_chunks - f = 0.0 if p + r == 0.0 else 2.0 * p * r / (p + r) - print '%s\t%.2f\t%.2f\t%.2f' % (name, p, r, f) - - def dump(self, verbosity = 2): - if verbosity > 1: - print 'CHU chunks\t%d' % self.ch_chunks - print 'REF chunks\t%d' % self.ref_chunks - print 'Chunk hits\t%d' % self.chunk_hits - print 'Head hits\t%d' % self.head_hits - print 'Ch+Hd hits\t%d' % self.both_hits - if verbosity > 0: - self.dump_prf('Chunk P,R,F', self.chunk_hits) - self.dump_prf('Heads P,R,F', self.head_hits) - self.dump_prf('Ch+Hd P,R,F', self.both_hits) - + def __init__(self): + self.ch_chunks = 0 + self.ref_chunks = 0 + self.chunk_hits = 0 + self.head_hits = 0 + self.both_hits = 0 + + def update(self, ch_annots, ref_annots): + self.ch_chunks += len(ch_annots) + self.ref_chunks += len(ref_annots) + # sort by left border + ch = dict([(min(ann.indices), ann) for ann in ch_annots]) + ref = dict([(min(ann.indices), ann) for ann in ref_annots]) + ch_idx = ref_idx = 0 + maybe_hits = set(ch).intersection(ref) + for idx in maybe_hits: + if list(ch[idx].indices) == list(ref[idx].indices): + self.chunk_hits += 1 + if ch[idx].head_index == ref[idx].head_index: + self.both_hits += 1 + # now compare head indices only + ch = set(ann.head_index for ann in ch_annots) + ref = set(ann.head_index for ann in ref_annots) + self.head_hits += len(ch.intersection(ref)) + + def getPRF(self, hits): + p = 0.0 if self.ch_chunks == 0 else 100.0 * hits / self.ch_chunks + r = 0.0 if self.ref_chunks == 0 else 100.0 * hits / self.ref_chunks + f = 0.0 if p + r == 0.0 else 2.0 * p * r / (p + r) + return [p, r, f] + + def getStats(self): + return [self.getPRF(self.chunk_hits)] def get_annots(sent, chan_name): - # wrap the sentence as an AnnotatedSentence - annots = [] - asent = corpus2.AnnotatedSentence.wrap_sentence(sent) - if asent.has_channel(chan_name): - chan = asent.get_channel(chan_name) - ann_vec = chan.make_annotation_vector() - for ann in ann_vec: - assert ann.head_index in ann.indices - annots.append(ann) - return annots - + # wrap the sentence as an AnnotatedSentence + annots = [] + asent = corpus2.AnnotatedSentence.wrap_sentence(sent) + if asent.has_channel(chan_name): + chan = asent.get_channel(chan_name) + ann_vec = chan.make_annotation_vector() + for ann in ann_vec: + assert ann.head_index in ann.indices + annots.append(ann) + return annots def go(): - parser = OptionParser(usage=descr) - parser.add_option('-i', '--input-format', type='string', action='store', - dest='input_format', default='ccl', - help='set the input format; default: ccl') - parser.add_option('-t', '--tagset', type='string', action='store', - dest='tagset', default='nkjp', - help='set the tagset used in input; default: nkjp') - parser.add_option('-q', '--quiet', action='store_false', - default=True, dest='verbose') - (options, args) = parser.parse_args() - - if len(args) != 3: - sys.stderr.write('No args. See --help\n') - sys.exit(1) - - ch_path, ref_path, chan_name = args - tagset = corpus2.get_named_tagset(options.tagset) - ch_rdr = corpus2.TokenReader.create_path_reader( - options.input_format, tagset, ch_path) - ref_rdr = corpus2.TokenReader.create_path_reader( - options.input_format, tagset, ref_path) - - stats = Stats() - - while True: - # iterate over paragraphs (note that they are called "chunks" here) - ref_chunk = ref_rdr.get_next_chunk() - ch_chunk = ch_rdr.get_next_chunk() - assert (not ref_chunk) == (not ch_chunk), 'corpora of different length' - - if not ref_chunk: - break # end of input - - # process each sentence separately - for ch_sent, ref_sent in zip(ch_chunk.sentences(), ref_chunk.sentences()): - assert ch_sent.size() == ref_sent.size() - ch_annots = get_annots(ch_sent, chan_name) - ref_annots = get_annots(ref_sent, chan_name) - stats.update(ch_annots, ref_annots) - stats.dump(int(options.verbose) + 1) - + parser = OptionParser(usage=descr) + parser.add_option('-i', '--input-format', type='string', action='store', + dest='input_format', default='ccl', + help='set the input format; default: ccl') + parser.add_option('-O', '--output-file', type='string', action='store', + dest='out_path', default='', + help='set output filename (do not write to stdout)') + parser.add_option('-t', '--tagset', type='string', action='store', + dest='tagset', default='nkjp', + help='set the tagset used in input; default: nkjp') + parser.add_option('-c', '--chunk-names', type='string', action='store', + dest='chunk_names', default='', + help='set chunk_names to eval') + parser.add_option('-f', '--folds', type="int", action='store', + dest='folds', default=1, + help='Number of folds') + parser.add_option('-q', '--quiet', action='store_false', + default=True, dest='verbose') + (options, args) = parser.parse_args() + + if len(args) != 2: + sys.stderr.write('No args. See --help\n') + sys.exit(1) + + ch_path, ref_path = args + main(ch_path, ref_path, options.chunk_names, options.input_format, options.out_path, options.tagset, options.verbose, options.folds) + +def main(ch_path, ref_path, chan_name, input_format, out_path, tagset, verbose, folds): + + csvWriter = CSVWriter(",") + + csvWriter.addColumns(["Nr ","Chunk"]) + csvWriter.addSubColumnsByName("Chunk", ["P", "R", "F"]) + + tagset = corpus2.get_named_tagset(tagset) + + for fold in range(1, folds+1): + if folds > 1: + ch_path_fold = os.path.join(ch_path, 'ccl-test' + str(fold).zfill(2) + '.xml') + ref_path_fold = os.path.join(ref_path, 'ccl-test' + str(fold).zfill(2) + '.xml') + else: + ch_path_fold = ch_path + ref_path_fold = ref_path + + ch_rdr = corpus2.TokenReader.create_path_reader( + input_format, tagset, ch_path_fold) + ref_rdr = corpus2.TokenReader.create_path_reader( + input_format, tagset, ref_path_fold) + + stats = Stats() + + while True: + # iterate over paragraphs (note that they are called "chunks" here) + ref_chunk = ref_rdr.get_next_chunk() + ch_chunk = ch_rdr.get_next_chunk() + assert (not ref_chunk) == (not ch_chunk), 'corpora of different length' + + if not ref_chunk: + break # end of input + + # process each sentence separately + for ch_sent, ref_sent in zip(ch_chunk.sentences(), ref_chunk.sentences()): + assert ch_sent.size() == ref_sent.size() + ch_annots = get_annots(ch_sent, chan_name) + ref_annots = get_annots(ref_sent, chan_name) + stats.update(ch_annots, ref_annots) + + results = stats.getStats() + results[:0] = [fold] + csvWriter.addRow(results) + csvWriter.count_avg() + print csvWriter + if __name__ == '__main__': - go() + go() \ No newline at end of file