import json import os import uuid import pika from minio import Minio from pymongo import MongoClient from urllib3 import HTTPResponse from new_datasets.whisper_processor import WhisperAsrProcessor from sziszapangma.integration.repository.mongo_experiment_repository import MongoExperimentRepository def get_param(name: str, default: str) -> str: return os.environ[name] if name in os.environ else default _MINIO_HOST = get_param('MINIO_HOST', 'minio-asr-benchmarks.theliver.pl') _MINIO_USER = get_param('MINIO_USER', 'minio_user') _MINIO_PASSWORD = get_param('MINIO_PASSWORD', 'eUxzEQbyYPdzrLxuvvethSbk18kB2s7G') _MONGO_URL = get_param('MONGO_URL', 'mongodb://root:example@mongo-asr-benchmarks.theliver.pl:27021/') _RABBIT_URL = get_param('RABBIT_URL', 'amqps://rabbit_user:kz6m4972OUHFmtUcPOHx4kF3Lj6yw7lo@rabbit-asr-benchmarks.theliver.pl:5671/') _WHISPER_MODEL = get_param('WHISPER_MODEL', 'small') _TEMP_WAV = f'{uuid.uuid4()}_temp.wav' def get_minio_client() -> Minio: return Minio(_MINIO_HOST, _MINIO_USER, _MINIO_PASSWORD) def download_file(dataset_name: str, item_id: str): response: HTTPResponse = get_minio_client().get_object('dataset-audio', f'{dataset_name}/{item_id}.wav') with open(_TEMP_WAV, 'wb') as f: f.write(response.data) def get_experiment_repo(dataset_name: str) -> MongoExperimentRepository: mongo = MongoClient(_MONGO_URL, ssl=True) return MongoExperimentRepository(mongo_client=mongo, database_name=dataset_name) def main(): parameters = pika.URLParameters(_RABBIT_URL) connection = pika.BlockingConnection(parameters=parameters) channel = connection.channel() channel.basic_qos(prefetch_count=1) queue_name = f'asr__whisper_{_WHISPER_MODEL}' whisper_processor = WhisperAsrProcessor(_WHISPER_MODEL) for method_frame, properties, body in channel.consume(queue_name): print(method_frame, properties, body) message_dict = json.loads(body.decode('utf-8')) print(message_dict) experiment_repository = get_experiment_repo(message_dict['dataset']) record_id = message_dict['item_id'] dataset = message_dict['dataset'] exp_property = f'whisper_{_WHISPER_MODEL}__result' if not experiment_repository.property_exists(record_id, exp_property): download_file(dataset_name=dataset, item_id=record_id) asr_result = whisper_processor.call_recognise(_TEMP_WAV) print(asr_result) print(asr_result['full_text']) exp_property = f'whisper_{_WHISPER_MODEL}__result' experiment_repository.update_property_for_key(record_id=record_id, property_name=exp_property, property_value=asr_result) else: print('skip', experiment_repository.get_property_for_key(record_id, exp_property)) channel.basic_ack(method_frame.delivery_tag) print('\n########################################################\n') requeued_messages = channel.cancel() print('Requeued %i messages' % requeued_messages) connection.close() if __name__ == '__main__': main()