diff --git a/nlp_ws/_service.py b/nlp_ws/_service.py index b14958007b96116177a31215a2e364e51b299895..ef994e05d7effbb35387df906b83163c0b4fa610 100644 --- a/nlp_ws/_service.py +++ b/nlp_ws/_service.py @@ -94,6 +94,7 @@ class NLPService(object): CFG_S_SERV_O_RHOST = 'rabbit_host' CFG_S_SERV_O_RUSER = 'rabbit_user' CFG_S_SERV_O_RPSWD = 'rabbit_password' + CFG_S_SERV_O_PROD = 'is_production' CFG_S_TOOL = 'tool' CFG_S_TOOL_O_NUMW = 'workers_number' @@ -172,7 +173,8 @@ class NLPService(object): rabbit_host, rabbit_user, rabbit_passwd, - queue_name): + queue_name, + is_production): logh = (SocketHandler(*log_address) if log_address is not None @@ -194,7 +196,7 @@ class NLPService(object): chan.queue_declare(queue_name) chan.basic_qos(prefetch_count=1) chan.basic_consume( - _Consumer(wrk, worker_path, chan), + _Consumer(wrk, worker_path, chan, is_production), queue_name ) _log.info('Starting worker %r with queue: %s', wrk, queue_name) @@ -247,6 +249,10 @@ class NLPService(object): self._r_user = cfg_serv[self.CFG_S_SERV_O_RUSER] self._r_passwd = cfg_serv[self.CFG_S_SERV_O_RPSWD] + self._is_production = cfg_serv.get( + self.CFG_S_SERV_O_PROD, "true" + ).lower() == "true" + if not os.path.exists(self._wrk_path): os.makedirs(self._wrk_path) @@ -294,6 +300,7 @@ class NLPService(object): rabbit_user=self._r_user, rabbit_passwd=self._r_passwd, queue_name=self._q_name, + is_production=self._is_production, ), name='worker-{}'.format(i), ) @@ -368,17 +375,19 @@ class NLPService(object): class _Consumer(object): - def __init__(self, worker, worker_path, chan): + def __init__(self, worker, worker_path, chan, is_production): self._wrk = worker self._pth = worker_path self._chan = chan self.last_progress = 0 self.last_time = 0 + self.is_production = is_production def __call__(self, chan, meth_frame, props, body): out_file = os.path.join(self._pth, props.correlation_id) result = {} - # chan.basic_ack(delivery_tag=meth_frame.delivery_tag) + if self.is_production: + chan.basic_ack(delivery_tag=meth_frame.delivery_tag) try: self._process_task(result, out_file, props, body) except Exception as e: @@ -395,7 +404,8 @@ class _Consumer(object): ), body=json.dumps(result) ) - chan.basic_ack(delivery_tag=meth_frame.delivery_tag) + if not self.is_production: + chan.basic_ack(delivery_tag=meth_frame.delivery_tag) _log.info('Done with task %s', props.correlation_id) def update_progress(self, progress): diff --git a/setup.py b/setup.py index 96c6f2ffd5ea407d46b7e881d7da3045f189bf88..d6fc2630f3dd51bffbe413379d46e2d69b91e1a0 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages if __name__ == '__main__': setup( name='nlp_ws', - version='0.6', + version='0.7', description='Interface for applications to connect to G4.19 WS API', author='Tomasz Walkowiak, Michał Kaliński, Igor Danielewicz',