Skip to content
Snippets Groups Projects
corpio.py 5.73 KiB
Newer Older
# -*- coding: utf-8 -*-

# Copyright (C) 2011 Adam Radziszewski. Part of WMBT.
# This program is free software; you can redistribute and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.
#
# See the LICENCE and COPYING files for more details

# SWIG bug workaround: loading multiple SWIG modules brought unwrapped
# swig::stop_iteration exceptions
import ctypes, sys
sys.setdlopenflags(sys.getdlopenflags() | ctypes.RTLD_GLOBAL)

import corpus2, wccl
# TODO: get back to default dlopen policy?

import config
import codecs, os

format_help = """
Available input formats: """ + ' '.join(corpus2.TokenReader.available_reader_types()) + """
""" + ' '.join(corpus2.TokenReader.available_reader_types_help()) + """
Available output formats: """ + ' '.join(corpus2.TokenWriter.available_writer_types()) + """
""" + ' '.join(corpus2.TokenWriter.available_writer_types_help())

def f_name(model_name, subdir, ext, suff = ''):
	"""Gets the filename based on model_name having the given
	extension. Optionally, you can specify name suffix."""
	base = (model_name + '-' + suff + '.' + ext) if suff else (model_name + '.' + ext)
	return os.path.join(subdir, base)

def get_tagset(conf):
	return corpus2.get_named_tagset(conf.get(config.S_GLOBAL, config.O_TAGSET))

def get_reader(in_path, tagset, input_format):
	"""Creates a reader using the options. If in_path evaluates to False,
	will create a stdin reader."""
	if in_path:
		return corpus2.TokenReader.create_path_reader(
			input_format, tagset, in_path)
	else:
		return corpus2.TokenReader.create_stdin_reader(input_format, tagset)

def get_writer(out_path, tagset, output_format):
	"""Creates a writer using the options. If out_path evaluates to False,
	will create a stdout writer."""
	if out_path:
		return corpus2.TokenWriter.create_path_writer(output_format, out_path,
			tagset)
	else:
		return corpus2.TokenWriter.create_stdout_writer(output_format, tagset)

def op_list(wccl_file, sec_name):
	"""Retrieves a list of operators corresponding to a named section from
	the given WCCL file. If section not present, will return an empty list."""
	ops = []
	if wccl_file.has_untyped_section(sec_name):
		sec = wccl_file.get_untyped_section(sec_name)
		for op_idx in range(sec.size()):
			ops.append(sec.get_ptr(op_idx))
	return ops

def get_wccl_ops(conf, model_name, wccl_dir, lex_dir, attr_names):
	"""Returns a pair: (WCCL op list, tag_rules).
	WCCL op list is a list of WCCL operator lists corresponding to the given
	attribute names. Each list may consists of two parts: the default
	operators and attribute-specific operators (theoretically both may be
	empty).
	The tag_rules are either None, if no rules in the file, or TagRuleSequence
	object ready to use."""
	wccl_file_path = f_name(model_name, wccl_dir, config.EXT_WCCL)
	tagset = corpus2.get_named_tagset(conf.get(config.S_GLOBAL, config.O_TAGSET))
	wccl_file = wccl.Parser(tagset).parseWcclFileFromPath(wccl_file_path, lex_dir)
	def_ops = op_list(wccl_file, config.DEFAULT_OPS)
	attr_ops = [def_ops + op_list(wccl_file, attr_name) for attr_name in attr_names]
	tag_rules = wccl_file.get_tag_rules_ptr() if \
		wccl_file.has_tag_rules() else None
	return (attr_ops, tag_rules)

class Layers:
	"""The definition of the employed list of tagging layers. Each layer is
	described by the attribute name, mask of the attribute (for fast extraction
	of values of the attribute from tags and tokens) and a list of features
	assigned to the layer (WCCL parsed expressions). A Layers object also
	contains tagging rules from the underlying WCCL file (if provided)."""
	
	def __init__(self, conf, model_name, conf_dir, data_dir):
		tagset = corpus2.get_named_tagset(conf.get(config.S_GLOBAL, config.O_TAGSET))
		attr_names = conf.get(config.S_GLOBAL, config.O_ATTRS).split(',')
		# empty string for wordclass attribute, will be translated to a mask
		# for all the wordclasses by corpus2
		attr_names = [''] + attr_names
		attr_masks = [corpus2.get_attribute_mask(tagset, name) for name in attr_names]
		# now replace '' with config.WORDCLASS
		attr_names[0] = config.WORDCLASS
		# parse the WCCL file defining features for the tagger
		attr_ops, self.tag_rules = get_wccl_ops(conf, model_name, conf_dir,
			data_dir, attr_names)
		
		# a sequence of (attr_name, mask, ops) tuples for the wordclass and all
		# tagset attributes defined in the config file. Wordclass is always
		# first, while attributes are present in the order specified in the
		# config. Each mask is a corpus2 Tag structure to be used for attribute
		# masking from tokens and tags.
		# The last tuple element is a list of WCCL operators to be used at the
		# layer, as defined in the .ccl file referenced by the config.
		self.layers = zip(attr_names, attr_masks, attr_ops)

def create_context(sent):
	"""Wraps the sentence as SentenceContext to be used with WCCL."""
	return wccl.SentenceContext(sent)

def mask2text(tagset, mask):
	"""Returns text representation for the given mask (a set of tagset symbols)
	suitable for writing to example file for the classifier."""
	if mask.is_null():
		return '-'
	return '-'.join(tagset.tag_to_symbol_string_vector(mask))

def text2mask(tagset, text):
	"""Parses the given text and returns a corpus2.Tag object, representing a
	mask."""
	if not text or text == '-':
		return corpus2.Tag() # empty mask
	return tagset.parse_symbol_string(text.replace('-',','))

def value2text(tagset, value):
	"""Returns text representation for the given WCCL value."""
	return value.to_string(tagset)
# TODO: string sets and other stuff from WCCL