Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

rpc process data events#348

Opperessor started this conversation inGeneral
Discussion options

producer.py
`import json
import time

import pika

class RabbitMqService:
queue_name = None
callback_queue = None
channel = None
connection = None
corr_id = None
response = None

def __init__(self, queue_name):    self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    self.channel = self.connection.channel()    self.target_queue = "target_queue"    self.channel.queue_declare(queue=self.target_queue, durable=True, arguments={"x-queue-type": "quorum"})    self.queue_name = queue_name    result = self.channel.queue_declare(queue=self.queue_name, auto_delete=True)    self.callback_queue = result.method.queue    self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)    self.response = None    self.corr_id = Nonedef on_response(self, ch, method, props, body):    if self.corr_id == props.correlation_id:        self.response = bodydef call(self, n):    self.response = None    self.corr_id = self.queue_name    for i in n:        temp = json.dumps(i)        temp_bytes = bytes(temp, 'UTF-8')        self.channel.basic_publish(exchange='', routing_key=self.target_queue,                                   properties=pika.BasicProperties(reply_to=self.queue_name,                                                                   correlation_id=self.queue_name,                                                                   delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE),                                   body=temp_bytes)    self.connection.process_data_events(time_limit=None)    return self.responsedef value(self):    n = [[35, 38, 25, 35], [31, 32, 33]]    print("result - ",RabbitMqService.call(self, n))

ob = RabbitMqService("test_queue")
ob.value()

consumer.py
import json
import pika

class RabbitMqService:
definit(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.input_queue = "target_queue"
self.channel.queue_declare(queue=self.input_queue, durable=True, arguments={"x-queue-type": "quorum"})
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue=self.input_queue, on_message_callback=self.on_request)

def fib(self, n):    if n < 0:        print("Incorrect input")    elif n == 0:        return 0    elif n == 1 or n == 2:        return 1    else:        return self.fib(n - 1) + self.fib(n - 2)def on_request(self, ch, method, props, body):    temp = []    n = json.loads(body.decode('utf-8'))    for i in n:        response = self.fib(i)        temp.append(response)    print(temp)    ch.basic_publish(exchange='', routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(temp))    ch.basic_ack(delivery_tag=method.delivery_tag)def _consumption(self):    self.channel.start_consuming()

ob = RabbitMqService()
ob._consumption()
`

When i publish all the values into the queue and process_data_events, im only able to get the output for the first value(of n in producer.py). how can i get for all the values. I know it can be done with process _data_events in an infinite while loop until the response is not none but i dont want an infinite loop. Is there a different way or a method that can be used to get the result for all the values, Have a minimal knowledge on RMQ, can someone pls Help me out.

You must be logged in to vote

Replies: 0 comments

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Category
General
Labels
None yet
1 participant
@Opperessor

[8]ページ先頭

©2009-2025 Movatter.jp