Select Git revision
process_texts.py
Michał Marcińczuk authored
process_texts.py 4.99 KiB
from __future__ import absolute_import, division, print_function
import codecs
import logging
import argparse
import time
import glob
import os
from pathlib import Path
# import tqdm F811 redefinition of unused 'tqdm'
from tqdm import tqdm
from poldeepner2.models import PolDeepNer2
from poldeepner2.pipeline import tokenization
from poldeepner2.utils.data_utils import wrap_annotations
def flatten(list_of_lists):
flat_list = []
for lit in list_of_lists:
flat_list.extend(lit)
return [flat_list]
def read_content_autobom(path: str) -> str:
bytes = min(32, os.path.getsize(path))
content = open(path, 'rb').read(bytes)
if content.startswith(codecs.BOM_UTF8):
encoding = 'utf-8-sig'
else:
encoding = 'utf-8'
return open(path, "r", encoding=encoding).read()
def main(args):
print("Loading the NER model ...")
t0 = time.time()
if args.pretrained_path:
tokenizer = tokenization.load(args.tokenization)
ner = PolDeepNer2(args.model, args.pretrained_path,
device=args.device,
max_seq_length=args.max_seq_length,
squeeze=args.squeeze, seed=args.seed,
tokenizer=tokenizer)
else:
ner = PolDeepNer2.load(args.model, device=args.device,
resources_path=".models")
if args.max_seq_length:
ner.max_seq_length = args.max_seq_length
if tokenization:
ner.tokenizer = tokenization.load(args.tokenization)
time_load = time.time() - t0
time_preprocess = 0
time_ner = 0
data_size = 0
for path in tqdm(glob.glob(args.input + "/*.txt")):
content = read_content_autobom(path)
data_size += len(content)
texts = content.split('\n')
t0 = time.time()
tokenized_sentences = ner.tokenizer.tokenize(texts)
time_preprocess += (time.time() - t0)
t0 = time.time()
predictions = ner.process(tokenized_sentences)
predictions = flatten(predictions)
tokenized_sentences = flatten(tokenized_sentences)
annotations = wrap_annotations(predictions)
time_ner += (time.time() - t0)
output = Path(args.output) / Path(path).name
with open(output, "w") as fout:
for an in annotations:
text = " ".join(
[tokenized_sentences[0][n] for n in an.token_ids])
token_start = min(an.token_ids)
token_end = max(an.token_ids)
fout.write(
f"{an.annotation}\t{token_start}\t{token_end}\t{text}\n")
print(f"Model loading time : {time_load:8.4} second(s)")
print(f"Data preprocessing time : {time_preprocess:8.4} second(s)")
print(f"Data NE recognition time : {time_ner:8.4} second(s)")
print(f"Total time : "
f"{time_load+time_preprocess+time_ner:8.4} second(s)")
print(f"Data size: : {data_size/1000000:8.4}M characters")
def parse_args():
parser = argparse.ArgumentParser(
description='Process a set of plain text files from given folder. '
'The output is save to another folder.')
parser.add_argument('--input', required=True, metavar='PATH',
help='path to an input folder with texts')
parser.add_argument('--output', required=True, metavar='PATH',
help='path to an output folder')
parser.add_argument('--model', required=True, metavar='PATH',
help='model name or path to a model name')
# Required if the pretrained_path is given
parser.add_argument('--pretrained_path', required=False, metavar='PATH',
help='pretrained XLM-Roberta model path')
parser.add_argument('--max_seq_length', required=False, default=None,
metavar='N', type=int,
help='the maximum total input sequence length after '
'WordPiece tokenization.')
parser.add_argument('--device', required=False, default="cpu",
metavar='cpu|cuda',
help='device type used for processing')
parser.add_argument('--tokenization', required=False, default=None,
choices=tokenization.names,
help='Tokenization method')
parser.add_argument('--squeeze', required=False, default=False,
action="store_true",
help='try to squeeze multiple examples into one '
'Input Feature')
parser.add_argument('--seed', required=False, default=377,
metavar='N', type=int,
help='a seed used to initialize a number generator')
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, filemode="w")
args = parse_args()
try:
main(args)
except ValueError as er:
print("[ERROR] %s" % er)