Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
A
asr-benchmarks
Manage
Activity
Members
Labels
Plan
Issues
8
Issue boards
Milestones
Wiki
Redmine
Code
Merge requests
0
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Marcin Wątroba
asr-benchmarks
Commits
95002a46
Commit
95002a46
authored
2 years ago
by
Marcin Wątroba
Browse files
Options
Downloads
Patches
Plain Diff
Change facebook wav2vec2 model
parent
c5d1a385
Branches
Branches containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
new_experiment/queue_base.py
+58
-0
58 additions, 0 deletions
new_experiment/queue_base.py
new_experiment/worker_pipeline.py
+2
-62
2 additions, 62 deletions
new_experiment/worker_pipeline.py
with
60 additions
and
62 deletions
new_experiment/queue_base.py
0 → 100644
+
58
−
0
View file @
95002a46
import
functools
import
threading
from
typing
import
Callable
import
pika
from
pika.adapters.blocking_connection
import
BlockingChannel
from
new_experiment.utils.param_util
import
get_param
_RABBIT_URL
=
get_param
(
'
RABBIT_URL
'
,
'
amqps://rabbit_user:kz6m4972OUHFmtUcPOHx4kF3Lj6yw7lo@rabbit-asr-benchmarks.theliver.pl:5671/
'
)
def
ack_message
(
channel
,
delivery_tag
):
"""
Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if
channel
.
is_open
:
channel
.
basic_ack
(
delivery_tag
)
else
:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def
process_queue
(
queue_name
:
str
,
process_message
:
Callable
[[
bytes
],
None
]):
def
do_work
(
connection
,
channel
,
delivery_tag
,
body
):
process_message
(
body
)
cb
=
functools
.
partial
(
ack_message
,
channel
,
delivery_tag
)
connection
.
add_callback_threadsafe
(
cb
)
print
(
'
\n
#########################
\n
'
)
def
on_message
(
channel
:
BlockingChannel
,
method_frame
,
header_frame
,
body
,
args
):
(
connection
,
threads
)
=
args
delivery_tag
=
method_frame
.
delivery_tag
t
=
threading
.
Thread
(
target
=
do_work
,
args
=
(
connection
,
channel
,
delivery_tag
,
body
))
t
.
start
()
threads
.
append
(
t
)
parameters
=
pika
.
URLParameters
(
_RABBIT_URL
)
connection
=
pika
.
BlockingConnection
(
parameters
)
channel
=
connection
.
channel
()
channel
.
basic_qos
(
prefetch_count
=
1
)
threads
=
[]
on_message_callback
=
functools
.
partial
(
on_message
,
args
=
(
connection
,
threads
))
channel
.
basic_consume
(
queue_name
,
on_message_callback
)
try
:
channel
.
start_consuming
()
except
KeyboardInterrupt
:
channel
.
stop_consuming
()
# Wait for all to complete
for
thread
in
threads
:
thread
.
join
()
connection
.
close
()
This diff is collapsed.
Click to expand it.
new_experiment/worker_pipeline.py
+
2
−
62
View file @
95002a46
import
json
import
os
import
functools
import
logging
import
pika
import
threading
from
pika.adapters.blocking_connection
import
BlockingChannel
from
new_experiment.pipeline.pipeline_process_spacy_dep_tag_wer
import
run_spacy_dep_tag_wer_pipeline
from
new_experiment.pipeline.pipeline_process_spacy_ner_wer
import
run_spacy_ner_wer_pipeline
from
new_experiment.pipeline.pipeline_process_spacy_pos_wer
import
run_spacy_pos_wer_pipeline
from
new_experiment.pipeline.pipeline_process_word_classic_wer
import
run_word_wer_classic_pipeline
from
new_experiment.pipeline.pipeline_process_word_embedding_wer
import
run_word_wer_embedding_pipeline
from
new_experiment.utils.param_util
import
get_param
_RABBIT_URL
=
get_param
(
'
RABBIT_URL
'
,
'
amqps://rabbit_user:kz6m4972OUHFmtUcPOHx4kF3Lj6yw7lo@rabbit-asr-benchmarks.theliver.pl:5671/
'
)
from
new_experiment.queue_base
import
process_queue
def
process_message
(
body
:
bytes
):
...
...
@@ -41,54 +30,5 @@ def process_message(body: bytes):
raise
Exception
(
f
"
Bad message
{
message_dict
}
"
)
def
ack_message
(
channel
,
delivery_tag
):
"""
Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if
channel
.
is_open
:
channel
.
basic_ack
(
delivery_tag
)
else
:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def
do_work
(
connection
,
channel
,
delivery_tag
,
body
):
process_message
(
body
)
cb
=
functools
.
partial
(
ack_message
,
channel
,
delivery_tag
)
connection
.
add_callback_threadsafe
(
cb
)
print
(
'
\n
#########################
\n
'
)
def
on_message
(
channel
:
BlockingChannel
,
method_frame
,
header_frame
,
body
,
args
):
(
connection
,
threads
)
=
args
delivery_tag
=
method_frame
.
delivery_tag
t
=
threading
.
Thread
(
target
=
do_work
,
args
=
(
connection
,
channel
,
delivery_tag
,
body
))
t
.
start
()
threads
.
append
(
t
)
def
new_main
():
parameters
=
pika
.
URLParameters
(
_RABBIT_URL
)
connection
=
pika
.
BlockingConnection
(
parameters
)
channel
=
connection
.
channel
()
channel
.
basic_qos
(
prefetch_count
=
1
)
threads
=
[]
on_message_callback
=
functools
.
partial
(
on_message
,
args
=
(
connection
,
threads
))
channel
.
basic_consume
(
'
hf_facebook_wav2vec2_asr
'
,
on_message_callback
)
try
:
channel
.
start_consuming
()
except
KeyboardInterrupt
:
channel
.
stop_consuming
()
# Wait for all to complete
for
thread
in
threads
:
thread
.
join
()
connection
.
close
()
if
__name__
==
'
__main__
'
:
new_main
(
)
process_queue
(
'
asr_benchmark_experiments
'
,
process_message
)
This diff is collapsed.
Click to expand it.
Preview
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment