producer.py `import json import time import pika class RabbitMqService: queue_name = None callback_queue = None channel = None connection = None corr_id = None response = None def __init__(self, queue_name): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() self.target_queue = "target_queue" self.channel.queue_declare(queue=self.target_queue, durable=True, arguments={"x-queue-type": "quorum"}) self.queue_name = queue_name result = self.channel.queue_declare(queue=self.queue_name, auto_delete=True) self.callback_queue = result.method.queue self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) self.response = None self.corr_id = Nonedef on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = bodydef call(self, n): self.response = None self.corr_id = self.queue_name for i in n: temp = json.dumps(i) temp_bytes = bytes(temp, 'UTF-8') self.channel.basic_publish(exchange='', routing_key=self.target_queue, properties=pika.BasicProperties(reply_to=self.queue_name, correlation_id=self.queue_name, delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE), body=temp_bytes) self.connection.process_data_events(time_limit=None) return self.responsedef value(self): n = [[35, 38, 25, 35], [31, 32, 33]] print("result - ",RabbitMqService.call(self, n))
ob = RabbitMqService("test_queue") ob.value() consumer.py import json import pika
class RabbitMqService: definit(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() self.input_queue = "target_queue" self.channel.queue_declare(queue=self.input_queue, durable=True, arguments={"x-queue-type": "quorum"}) self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(queue=self.input_queue, on_message_callback=self.on_request) def fib(self, n): if n < 0: print("Incorrect input") elif n == 0: return 0 elif n == 1 or n == 2: return 1 else: return self.fib(n - 1) + self.fib(n - 2)def on_request(self, ch, method, props, body): temp = [] n = json.loads(body.decode('utf-8')) for i in n: response = self.fib(i) temp.append(response) print(temp) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(temp)) ch.basic_ack(delivery_tag=method.delivery_tag)def _consumption(self): self.channel.start_consuming()
ob = RabbitMqService() ob._consumption() ` When i publish all the values into the queue and process_data_events, im only able to get the output for the first value(of n in producer.py). how can i get for all the values. I know it can be done with process _data_events in an infinite while loop until the response is not none but i dont want an infinite loop. Is there a different way or a method that can be used to get the result for all the values, Have a minimal knowledge on RMQ, can someone pls Help me out. |