diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..acac6a8900432063aef129f167a6a0e1f3d86912 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,14 @@ +image: 'clarinpl/python:2.7' + +build_image: + image: 'docker:18.09.7' + only: + - master + services: + - 'docker:18.09.7-dind' + script: + - docker build -t clarinpl/iobber . + - echo $DOCKER_PASSWORD > pass.txt + - cat pass.txt | docker login --username $DOCKER_USERNAME --password-stdin + - rm pass.txt + - docker push clarinpl/iobber diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..1d9bf28e88529299dca8f959b375860d35eb86e3 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM clarinpl/python:2.7 + +RUN apt-get update && apt-get install -y \ + corpus2-python2.7 \ + wccl-python2.7 \ + crfpp \ + locales + +WORKDIR /home/deps +RUN wget https://minio.clarin-pl.eu/public/share/CRF++-0.58.tar.gz && tar -xzf CRF++-0.58.tar.gz +RUN cd CRF++-0.58/python && python setup.py build && python setup.py install + +WORKDIR /home/worker +COPY ./src ./src +COPY ./main.py . +COPY ./requirements.txt . +RUN python2.7 -m pip install -r ./requirements.txt +RUN export PYTHONIOENCODING=UTF-8 +RUN locale-gen en_US.UTF-8 +ENV LANG en_US.UTF-8 +ENV LANGUAGE en_US:en +ENV LC_ALL en_US.UTF-8 +CMD ["python2.7", "main.py", "service"] \ No newline at end of file diff --git a/config.ini b/config.ini index 555ea5b64038feb6a9e15ea581e49ce5a98cd6f3..6c88cc741882fa9ab079cbd295a9d71214ca3ae7 100644 --- a/config.ini +++ b/config.ini @@ -1,19 +1,15 @@ -; PLIK KONFIGURACYJNY WORKERA -; Plik zawiera konfigurację zarówno Api usługi sieciowej jak i narzędzia. -; -; Autor: Tomasz Walkowiak -; email: tomasz.walkowiak@pwr.edu.pl - -; --------- CZĘŚĆ DLA Serwisu --------- [service] root = /samba/requests/ tool = iobber -rabbit_host =10.108.19.85 -rabbit_user =clarin -rabbit_password =clarin123 +rabbit_host = rabbitmq +rabbit_user = test +rabbit_password = test -; --------- CZĘŚĆ DLA Narzedzia --------- [tool] -workers_number = 2 -model-dir = /home/macias/repos/iobber/iobber/data/model-kpwr11-H +workers_number = 1 +model-dir = /usr/local/lib/python2.7/dist-packages/iobber/data/model-kpwr11-H iobber-config = kpwr.ini + +[logging] +port = 9995 +local_log_level = INFO \ No newline at end of file diff --git a/iobber_worker.py b/iobber_worker.py deleted file mode 100644 index 98a67811624cd97e8eb7eeb333df42b088bb0783..0000000000000000000000000000000000000000 --- a/iobber_worker.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -from worker import NLPWorker,NLPService -from iobber.chunker import Chunker -from logger import * - -class IobberWorker(NLPWorker): - def init(self): - self.logger.log(INFO, "Starting iobber with "+self.config['iobber-config'] ) - self.chunker = Chunker(self.config['iobber-config'], - self.config['model-dir'], - verbose = False) - self.chunker.load_model() - self.logger.log(INFO, "Iobber model loaded form "+ self.config['model-dir']) - - def process(self, inputFile, taskOptions, outputFile): - - self.chunker.tag_input(inputFile,outputFile, "ccl", "ccl", False) - #,autogen_sent_id - -if __name__ == '__main__': - service= NLPService(); - service.start(IobberWorker); - diff --git a/logger.py b/logger.py deleted file mode 100644 index c094956db2a4862f0e653a7acb3d444f8ef36d49..0000000000000000000000000000000000000000 --- a/logger.py +++ /dev/null @@ -1,139 +0,0 @@ -# -*- encoding: utf-8 -*- - -import os -import logging, logging.handlers -import traceback - -DEBUG = logging.DEBUG -INFO = logging.INFO -WARNING = logging.WARNING -ERROR = logging.ERROR -CRITICAL= logging.CRITICAL - -class Logger(object): - """This class is adapter for standard logging library - it has function for both service-wise logging - and tas-wise logging. The interface is ver similliar to standard logging package.""" - def __init__(self, lvl, main_log_path, task_logs_path, log_to_console): - """Constructor - lvl - logging level (string), possible values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" - main_log_path - path to service-wise logfile - task_logs_path - path to directory containing task specific logfiles - """ - self._task_logs_path = task_logs_path - self._main_log_path = main_log_path - self._log_to_console = log_to_console - self.default_logging_lvl = lvl - logger_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - self._default_formatter = logging.Formatter(logger_format) - - self._main_logger = None - - def log(self, lvl, exception): - """Log message in main logfile - lvl - message level, as in constructor - exception - string message or exception to log - If the exception is WARNING, ERROR or CRITICAL, than logs traceback as well. - """ - if self._main_logger is None: - self._main_logger = self._get_main_logger(self._main_log_path, self._log_to_console) - - if lvl in [WARNING, ERROR, CRITICAL]: - self._log_traceback(self._main_logger, lvl, exception) - self._log(self._main_logger, lvl, exception) - - def debug(exception): - self.log(DEBUG, exception) - def info(exception): - self.log(INFO, exception) - def warning(exception): - self.log(WARNING, exception) - def error(exception): - self.log(ERROR, exception) - - def task_log(self, task_token, lvl, exception): - """Log message in task specific logfile - lvl - message level, as in constructor - exception - string message or exception to log - If the exception is WARNING, ERROR or CRITICAL, than logs traceback with main logger. - """ - if lvl in [WARNING, ERROR, CRITICAL]: - self._log_traceback(self._main_logger, lvl, exception) - task_logger, task_logger_file_descriptor = self._get_task_logger(task_token) - self._log(task_logger, lvl, exception) - - def task_debug(task_token, exception): - self.task_log(task_token, DEBUG, exception) - def task_info(task_token, exception): - self.task_log(task_token, INFO, exception) - def task_warning(task_token, exception): - self.task_log(task_token, WARNING, exception) - def task_error(task_token, exception): - self.task_log(task_token, ERROR, exception) - - def shutdown(self): - logging.shutdown() - - @staticmethod - def str2logging_lvl(str_level): - """Translates logging level in string into logging module const""" - str_level = str_level.lower() - return {"debug" : DEBUG, - "info" : INFO, - "warning" : WARNING, - "error" : ERROR, - "critical" : CRITICAL}[str_level] - - def _get_main_logger(self, main_log_path, log_to_console): - """Creates /retrieves main logger - global for whole service and a parent to all task loggers, - which writes to console and main logfile.""" - logger, descriptor = self._get_logger(self.default_logging_lvl, 'service', main_log_path) - self.main_logfile_descr = descriptor - - if log_to_console and self._check_logger_inited('service'): - console_handler = logging.StreamHandler() - console_handler.setLevel(self.default_logging_lvl) - console_handler.setFormatter(self._default_formatter) - logger.addHandler(console_handler) - return logger - - def _get_task_logger(self, task_token): - """Creates/retrieves logging.Logger instance with certain level for task with supplied token""" - task_logger_name = 'service.task-' + task_token - task_log_file = os.path.join(self._task_logs_path, task_token) - return self._get_logger(self.default_logging_lvl, task_logger_name, task_log_file, delay=True) - - def _get_logger(self, logging_lvl, name, filepath, delay=False): - """Creates/retreives logging.Logger instance with certain level and name that writes into files - given by filepath (rotating).Returns logger and it's file descriptor.""" - logger = logging.getLogger(name) - - if not self._check_logger_inited(name): - handler = logging.handlers.RotatingFileHandler(filepath, delay=delay, maxBytes=1024000, backupCount=10) - handler.setLevel(logging_lvl) - handler.setFormatter(self._default_formatter) - logger.setLevel(logging_lvl) - logger.addHandler(handler) - else: - handler = logger.handlers[0] - - descriptor = handler.stream.fileno() if handler.stream else None - return logger, descriptor - - def _log(self, logger, lvl, exception): - """Logs given exception to given logger with given level.""" - logger.log(lvl, str(exception)) - - def _log_traceback(self, logger, lvl, exception): - """If exception's traceback can be extracted, it logs this traceback.""" - if isinstance(exception, Exception): - if hasattr(exception, 'traceback'): - traceback_str = exception.traceback - else: - traceback_str = traceback.format_exc() - - if traceback_str is not None: - self._log(self._main_logger, lvl, traceback_str) - - def _check_logger_inited(self, name): - logger = logging.getLogger(name) - return len(logger.handlers) != 0 diff --git a/main.py b/main.py new file mode 100644 index 0000000000000000000000000000000000000000..46a80130d7e9c786373fc6459c9cfabc01b0e677 --- /dev/null +++ b/main.py @@ -0,0 +1,34 @@ +"""Implementation of iobber service.""" +import argparse + +import nlp_ws + +from src.iobber_worker import IobberWorker + +def get_args(): + """Gets command line arguments.""" + parser = argparse.ArgumentParser( + description="Iobber implementation" + ) + + subparsers = parser.add_subparsers(dest="algorithm") + subparsers.required = True + + subparsers.add_parser("service", help="Run as a service") + return parser.parse_args() + + +def main(): + """Runs the program.""" + args = get_args() + + generators = { + "service": lambda: nlp_ws.NLPService.main(IobberWorker), + } + + gen_fn = generators.get(args.algorithm, lambda: None) + gen_fn() + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..ee9043c90035f8ef54903d3a0b2a4ea5863920cc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +iobber +nlp_ws \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/iobber_worker.py b/src/iobber_worker.py new file mode 100644 index 0000000000000000000000000000000000000000..3d53b44e81f6cd95b32a77e62d5ea1438aa006d8 --- /dev/null +++ b/src/iobber_worker.py @@ -0,0 +1,32 @@ +import logging +from iobber.chunker import Chunker +import nlp_ws + +_log = logging.getLogger(__name__) +class IobberWorker(nlp_ws.NLPWorker): + + @classmethod + def static_init(cls, config): + _log.info( + "Starting iobber with " + config["tool"]["iobber-config"] + ) + cls.configtool = config["tool"] + return + + def init(self): + """Initialize worker.""" + _log.info("Worker started loading models") + self.chunker = Chunker( + self.configtool["iobber-config"], + self.configtool["model-dir"], + verbose=False, + ) + self.chunker.load_model() + _log.info( + "Iobber model loaded form " + self.configtool["model-dir"] + ) + + def process(self, inputFile, taskOptions, outputFile): + inputFile = inputFile.encode('utf-8') + outputFile = outputFile.encode('utf-8') + self.chunker.tag_input(inputFile, outputFile, "ccl", "ccl", False) \ No newline at end of file diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..c4184edfec8c1260d2e85fd43d4181695733bbc3 --- /dev/null +++ b/src/logger.py @@ -0,0 +1,158 @@ +import os +import logging, logging.handlers +import traceback + +DEBUG = logging.DEBUG +INFO = logging.INFO +WARNING = logging.WARNING +ERROR = logging.ERROR +CRITICAL = logging.CRITICAL + + +class Logger(object): + """This class is adapter for standard logging library - it has function for both service-wise logging + and tas-wise logging. The interface is ver similliar to standard logging package.""" + + def __init__(self, lvl, main_log_path, task_logs_path, log_to_console): + """Constructor + lvl - logging level (string), possible values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" + main_log_path - path to service-wise logfile + task_logs_path - path to directory containing task specific logfiles + """ + self._task_logs_path = task_logs_path + self._main_log_path = main_log_path + self._log_to_console = log_to_console + self.default_logging_lvl = lvl + logger_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + self._default_formatter = logging.Formatter(logger_format) + + self._main_logger = None + + def log(self, lvl, exception): + """Log message in main logfile + lvl - message level, as in constructor + exception - string message or exception to log + If the exception is WARNING, ERROR or CRITICAL, than logs traceback as well. + """ + if self._main_logger is None: + self._main_logger = self._get_main_logger( + self._main_log_path, self._log_to_console + ) + + if lvl in [WARNING, ERROR, CRITICAL]: + self._log_traceback(self._main_logger, lvl, exception) + self._log(self._main_logger, lvl, exception) + + def debug(self, exception): + self.log(DEBUG, exception) + + def info(self, exception): + self.log(INFO, exception) + + def warning(self, exception): + self.log(WARNING, exception) + + def error(self, exception): + self.log(ERROR, exception) + + def task_log(self, task_token, lvl, exception): + """Log message in task specific logfile + lvl - message level, as in constructor + exception - string message or exception to log + If the exception is WARNING, ERROR or CRITICAL, than logs traceback with main logger. + """ + if lvl in [WARNING, ERROR, CRITICAL]: + self._log_traceback(self._main_logger, lvl, exception) + task_logger, _ = self._get_task_logger(task_token) + self._log(task_logger, lvl, exception) + + def task_debug(self, task_token, exception): + self.task_log(task_token, DEBUG, exception) + + def task_info(self, task_token, exception): + self.task_log(task_token, INFO, exception) + + def task_warning(self, task_token, exception): + self.task_log(task_token, WARNING, exception) + + def task_error(self, task_token, exception): + self.task_log(task_token, ERROR, exception) + + def shutdown(self): + logging.shutdown() + + @staticmethod + def str2logging_lvl(str_level): + """Translates logging level in string into logging module const""" + str_level = str_level.lower() + return { + "debug": DEBUG, + "info": INFO, + "warning": WARNING, + "error": ERROR, + "critical": CRITICAL, + }[str_level] + + def _get_main_logger(self, main_log_path, log_to_console): + """Creates /retrieves main logger - global for whole service and a parent to all task loggers, + which writes to console and main logfile.""" + logger, descriptor = self._get_logger( + self.default_logging_lvl, "service", main_log_path + ) + self.main_logfile_descr = descriptor + + if log_to_console and self._check_logger_inited("service"): + console_handler = logging.StreamHandler() + console_handler.setLevel(self.default_logging_lvl) + console_handler.setFormatter(self._default_formatter) + logger.addHandler(console_handler) + return logger + + def _get_task_logger(self, task_token): + """Creates/retrieves logging.Logger instance with certain level for task with supplied token""" + task_logger_name = "service.task-" + task_token + task_log_file = os.path.join(self._task_logs_path, task_token) + return self._get_logger( + self.default_logging_lvl, + task_logger_name, + task_log_file, + delay=True, + ) + + def _get_logger(self, logging_lvl, name, filepath, delay=False): + """Creates/retreives logging.Logger instance with certain level and name that writes into files + given by filepath (rotating).Returns logger and it's file descriptor.""" + logger = logging.getLogger(name) + + if not self._check_logger_inited(name): + handler = logging.handlers.RotatingFileHandler( + filepath, delay=delay, maxBytes=1024000, backupCount=10 + ) + handler.setLevel(logging_lvl) + handler.setFormatter(self._default_formatter) + logger.setLevel(logging_lvl) + logger.addHandler(handler) + else: + handler = logger.handlers[0] + + descriptor = handler.stream.fileno() if handler.stream else None + return logger, descriptor + + def _log(self, logger, lvl, exception): + """Logs given exception to given logger with given level.""" + logger.log(lvl, str(exception)) + + def _log_traceback(self, logger, lvl, exception): + """If exception's traceback can be extracted, it logs this traceback.""" + if isinstance(exception, Exception): + if hasattr(exception, "traceback"): + traceback_str = exception.traceback + else: + traceback_str = traceback.format_exc() + + if traceback_str is not None: + self._log(self._main_logger, lvl, traceback_str) + + def _check_logger_inited(self, name): + logger = logging.getLogger(name) + return len(logger.handlers) != 0 diff --git a/worker.py b/worker.py deleted file mode 100644 index 13a377b6eac2fff2ce21db6c453675529b1b7afb..0000000000000000000000000000000000000000 --- a/worker.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python -import pika -import ConfigParser -import os -import json -import shutil,time -from multiprocessing import Process - -from logger import * - -class NLPWorker(object): - """Class for communication with service engine. Has functions for retrieving new task (if available) - and finishing completed tasks.""" - def __init__(self,connection,logger,config): - """Constructor - config - config given in the form of dictionary containing required paths and request data. - logger - logger instance (as in logger.py) - """ - self.logger=logger - self.connection=connection; - self.config=config - self.queue_name="nlp_"+self.config["tool"]; - self.init(); - - def init(self): - return - - @staticmethod - def static_init(config,logger): - return - - def start(self): - creditentials = pika.PlainCredentials(self.config['rabbit_user'], self.config['rabbit_password']) - self.connection = pika.BlockingConnection(pika.ConnectionParameters( - host=self.config['rabbit_host'],credentials=creditentials)) - channel = self.connection.channel() - channel.queue_declare(queue=self.queue_name) - #self.channel=channel; - channel.basic_qos(prefetch_count=1) - channel.basic_consume(self.on_request, queue=self.queue_name) - - self.logger.log(INFO, "Worker started with queue: " +self.queue_name ) - - channel.start_consuming() - - def on_request(self,ch, method, props, body): - result={}; - try: - data=json.loads(body); - outputFile=self.config["root"]+self.config["tool"]+"/"+props.correlation_id; - result["file"]=outputFile; - start_time = time.time() - self.process(str(data["file"]),data["options"],outputFile); - stop_time = time.time() - self.logger.log(INFO, "Finished processing of task: " + props.correlation_id+ " in "+str(start_time-stop_time)); - result["error"]=""; - result["time"]=-start_time+stop_time; - - except Exception as e: - self.logger.log(ERROR, "Unable to process a task "+props.correlation_id+" with message: "+body) - result["error"]=str(e); - self.logger.log(ERROR, e) - - ch.basic_publish(exchange='', - routing_key=props.reply_to, - properties=pika.BasicProperties(correlation_id = \ - props.correlation_id), - body=str(json.dumps(result))) - ch.basic_ack(delivery_tag = method.delivery_tag) - self.logger.log(INFO, "Finished task: " + props.correlation_id) - - def process(self, inputFile, taskOptions, outputFile): - """Processing function. This is specific to service and should be implemented in the subclass.""" - raise NotImplementedError("`process` method should be implemented in the subclass.") - - - -class NLPService(object): - def __init__(self): - run_as_daemon=False; - self.config = self._read_config("config.ini") - self.connection=None - - logging_lvl = Logger.str2logging_lvl("debug") - logfile_path = os.path.join("", 'service.log') - self.logger = Logger(logging_lvl, logfile_path, "", True) - - path=self.config["root"]+self.config["tool"]+'/'; - d = os.path.dirname(path) - if not os.path.exists(d): - os.makedirs(d) - - def p(self,workerClass): - worker=workerClass(self.connection,self.logger,self.config); - worker.start() - - def start(self,workerClass): - self.logger.log(INFO, "Starting "+self.config["tool"] ); - - - workerClass.static_init(self.config,self.logger); - processes=[]; - for id in range(self.config['workers']): - p = Process(target=self.p, args=(workerClass,)) - p.start() - processes.append(p) - for p in processes: - p.join() - - - def _read_config(self, config_path): - """Read config file and create dictionary of option values""" - S_SERVICE = 'service' - S_TOOL = 'tool' - - config = dict() - with open(config_path) as config_file: - config_parser = ConfigParser.RawConfigParser() - config_parser.readfp(config_file) - for name, value in config_parser.items(S_SERVICE): - config[name] = value - for name, value in config_parser.items(S_TOOL): - config[name] = value - - config['workers'] = int(config['workers_number']) if 'workers_number' in config else 1 - - return config -#worker= Worker(); -#worker.start();