From 848dd7ce9065495b6534286938763a5138d6c5f6 Mon Sep 17 00:00:00 2001 From: szymekc <szymekc98@gmail.com> Date: Mon, 15 Nov 2021 14:56:12 +0100 Subject: [PATCH] change acknowledge order based on prod/test --- nlp_ws/_service.py | 20 +++++++++++++++----- setup.py | 2 +- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/nlp_ws/_service.py b/nlp_ws/_service.py index b149580..ef994e0 100644 --- a/nlp_ws/_service.py +++ b/nlp_ws/_service.py @@ -94,6 +94,7 @@ class NLPService(object): CFG_S_SERV_O_RHOST = 'rabbit_host' CFG_S_SERV_O_RUSER = 'rabbit_user' CFG_S_SERV_O_RPSWD = 'rabbit_password' + CFG_S_SERV_O_PROD = 'is_production' CFG_S_TOOL = 'tool' CFG_S_TOOL_O_NUMW = 'workers_number' @@ -172,7 +173,8 @@ class NLPService(object): rabbit_host, rabbit_user, rabbit_passwd, - queue_name): + queue_name, + is_production): logh = (SocketHandler(*log_address) if log_address is not None @@ -194,7 +196,7 @@ class NLPService(object): chan.queue_declare(queue_name) chan.basic_qos(prefetch_count=1) chan.basic_consume( - _Consumer(wrk, worker_path, chan), + _Consumer(wrk, worker_path, chan, is_production), queue_name ) _log.info('Starting worker %r with queue: %s', wrk, queue_name) @@ -247,6 +249,10 @@ class NLPService(object): self._r_user = cfg_serv[self.CFG_S_SERV_O_RUSER] self._r_passwd = cfg_serv[self.CFG_S_SERV_O_RPSWD] + self._is_production = cfg_serv.get( + self.CFG_S_SERV_O_PROD, "true" + ).lower() == "true" + if not os.path.exists(self._wrk_path): os.makedirs(self._wrk_path) @@ -294,6 +300,7 @@ class NLPService(object): rabbit_user=self._r_user, rabbit_passwd=self._r_passwd, queue_name=self._q_name, + is_production=self._is_production, ), name='worker-{}'.format(i), ) @@ -368,17 +375,19 @@ class NLPService(object): class _Consumer(object): - def __init__(self, worker, worker_path, chan): + def __init__(self, worker, worker_path, chan, is_production): self._wrk = worker self._pth = worker_path self._chan = chan self.last_progress = 0 self.last_time = 0 + self.is_production = is_production def __call__(self, chan, meth_frame, props, body): out_file = os.path.join(self._pth, props.correlation_id) result = {} - # chan.basic_ack(delivery_tag=meth_frame.delivery_tag) + if self.is_production: + chan.basic_ack(delivery_tag=meth_frame.delivery_tag) try: self._process_task(result, out_file, props, body) except Exception as e: @@ -395,7 +404,8 @@ class _Consumer(object): ), body=json.dumps(result) ) - chan.basic_ack(delivery_tag=meth_frame.delivery_tag) + if not self.is_production: + chan.basic_ack(delivery_tag=meth_frame.delivery_tag) _log.info('Done with task %s', props.correlation_id) def update_progress(self, progress): diff --git a/setup.py b/setup.py index 96c6f2f..d6fc263 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages if __name__ == '__main__': setup( name='nlp_ws', - version='0.6', + version='0.7', description='Interface for applications to connect to G4.19 WS API', author='Tomasz Walkowiak, Michał Kaliński, Igor Danielewicz', -- GitLab