Skip to content
Snippets Groups Projects

Change acknowledge order based on prod/test

2 files
+ 16
6
Compare changes
  • Side-by-side
  • Inline

Files

+ 15
5
@@ -94,6 +94,7 @@ class NLPService(object):
@@ -94,6 +94,7 @@ class NLPService(object):
CFG_S_SERV_O_RHOST = 'rabbit_host'
CFG_S_SERV_O_RHOST = 'rabbit_host'
CFG_S_SERV_O_RUSER = 'rabbit_user'
CFG_S_SERV_O_RUSER = 'rabbit_user'
CFG_S_SERV_O_RPSWD = 'rabbit_password'
CFG_S_SERV_O_RPSWD = 'rabbit_password'
 
CFG_S_SERV_O_PROD = 'is_production'
CFG_S_TOOL = 'tool'
CFG_S_TOOL = 'tool'
CFG_S_TOOL_O_NUMW = 'workers_number'
CFG_S_TOOL_O_NUMW = 'workers_number'
@@ -172,7 +173,8 @@ class NLPService(object):
@@ -172,7 +173,8 @@ class NLPService(object):
rabbit_host,
rabbit_host,
rabbit_user,
rabbit_user,
rabbit_passwd,
rabbit_passwd,
queue_name):
queue_name,
 
is_production):
logh = (SocketHandler(*log_address)
logh = (SocketHandler(*log_address)
if log_address is not None
if log_address is not None
@@ -194,7 +196,7 @@ class NLPService(object):
@@ -194,7 +196,7 @@ class NLPService(object):
chan.queue_declare(queue_name)
chan.queue_declare(queue_name)
chan.basic_qos(prefetch_count=1)
chan.basic_qos(prefetch_count=1)
chan.basic_consume(
chan.basic_consume(
_Consumer(wrk, worker_path, chan),
_Consumer(wrk, worker_path, chan, is_production),
queue_name
queue_name
)
)
_log.info('Starting worker %r with queue: %s', wrk, queue_name)
_log.info('Starting worker %r with queue: %s', wrk, queue_name)
@@ -247,6 +249,10 @@ class NLPService(object):
@@ -247,6 +249,10 @@ class NLPService(object):
self._r_user = cfg_serv[self.CFG_S_SERV_O_RUSER]
self._r_user = cfg_serv[self.CFG_S_SERV_O_RUSER]
self._r_passwd = cfg_serv[self.CFG_S_SERV_O_RPSWD]
self._r_passwd = cfg_serv[self.CFG_S_SERV_O_RPSWD]
 
self._is_production = cfg_serv.get(
 
self.CFG_S_SERV_O_PROD, "true"
 
).lower() == "true"
 
if not os.path.exists(self._wrk_path):
if not os.path.exists(self._wrk_path):
os.makedirs(self._wrk_path)
os.makedirs(self._wrk_path)
@@ -294,6 +300,7 @@ class NLPService(object):
@@ -294,6 +300,7 @@ class NLPService(object):
rabbit_user=self._r_user,
rabbit_user=self._r_user,
rabbit_passwd=self._r_passwd,
rabbit_passwd=self._r_passwd,
queue_name=self._q_name,
queue_name=self._q_name,
 
is_production=self._is_production,
),
),
name='worker-{}'.format(i),
name='worker-{}'.format(i),
)
)
@@ -368,17 +375,19 @@ class NLPService(object):
@@ -368,17 +375,19 @@ class NLPService(object):
class _Consumer(object):
class _Consumer(object):
def __init__(self, worker, worker_path, chan):
def __init__(self, worker, worker_path, chan, is_production):
self._wrk = worker
self._wrk = worker
self._pth = worker_path
self._pth = worker_path
self._chan = chan
self._chan = chan
self.last_progress = 0
self.last_progress = 0
self.last_time = 0
self.last_time = 0
 
self.is_production = is_production
def __call__(self, chan, meth_frame, props, body):
def __call__(self, chan, meth_frame, props, body):
out_file = os.path.join(self._pth, props.correlation_id)
out_file = os.path.join(self._pth, props.correlation_id)
result = {}
result = {}
# chan.basic_ack(delivery_tag=meth_frame.delivery_tag)
if self.is_production:
 
chan.basic_ack(delivery_tag=meth_frame.delivery_tag)
try:
try:
self._process_task(result, out_file, props, body)
self._process_task(result, out_file, props, body)
except Exception as e:
except Exception as e:
@@ -395,7 +404,8 @@ class _Consumer(object):
@@ -395,7 +404,8 @@ class _Consumer(object):
),
),
body=json.dumps(result)
body=json.dumps(result)
)
)
chan.basic_ack(delivery_tag=meth_frame.delivery_tag)
if not self.is_production:
 
chan.basic_ack(delivery_tag=meth_frame.delivery_tag)
_log.info('Done with task %s', props.correlation_id)
_log.info('Done with task %s', props.correlation_id)
def update_progress(self, progress):
def update_progress(self, progress):
Loading