Posted on • Originally published atcarlosmv.hashnode.dev on
Integrating Robyn with RabbitMQ | Python
In this article, we are going to learn how to integrate RabbitMQ into a Robyn server.
This article will show the code of the server that consumes messages from a RabbitMQ queue. And the code of a server that publishes messages to a queue.
RabbitMQ
RabbitMQ is a messaging broker - an intermediary for messaging. It gives your applications a common platform to send and receive messages, and your messages a safe place to live until received.
Requirements
Python installed
Pip installed
RabbitMQ installed(Download it fromhere)
Consumer
In this section, we are going to build a server that consumes messages from a queue.
First, we have to build a program that sends messages to the queue. So, that is whatsender.py
does. And in theapp.py
file, we write the server that consumes the messages.
sender.py
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='', routing_key='hello', body="Hello World!")print(" [x] Sent 'Hello'")connection.close()
In thesender.py
file, we import the RabbitMQ library, pika. We create a connection to the RabbitMQ server that runs on localhost. Then, we open a channel. Channels are where messages are routed and queued.
We declare a queue named "hello", where messages will be published and consumed from. We publish the "Hello World!" message to the "hello" queue. When the program publishes the message, it will the message " [x] Sent 'Hello' " in the console. Then we close the connection with the RabbitMQ server.
We run this program and we should see the following message in our terminal.
app.py
from robyn import Robyn, status_codesfrom robyn.robyn import Responseimport asyncioimport aio_pikaapp = Robyn( __file__ )@app.get("/")async def hello(): return Response(status_code=status_codes.HTTP_200_OK, headers={}, body="Hello, World!")async def startup(): loop = asyncio.get_event_loop() connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop) channel = await connection.channel() queue = await channel.declare_queue("hello") def callback(message: aio_pika.IncomingMessage): body = message.body.decode("utf-8") print(body) await queue.consume(callback) app.startup_handler(startup)app.start(port=8000, url="0.0.0.0")
In theapp.py
file, we import theaio-pika
library which is an async library for RabbitMQ. Then, we create a Robyn server, with one endpoint that sends a "Hello World" as a response. We define thestartup()
function that receives messages from the RabbitMQ server.
In thestartup()
function, we create a connection to the RabbitMQ server, create a channel, and declare the queue it will consume messages. Then, we define thecallback()
function, it receives the messages and prints it.
We usestartup_handler()
to register thestartup()
function, this way the server will start consuming messages from the RabbitMQ queue when the API starts.
We start the server and we should receive the message from the queue printed in the terminal.
Publisher
Here, we are going to build a server that publishes messages to a queue and a program that consumes the messages.
Thereceiver.py
file is where we write the program that will consume messages from the queue. Andapp.py
file is where we write the server that publishes them.
receiver.py
import pika, sys, osdef main(): connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(f" [x] Received {body}") channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()if __name__ == ' __main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)
Here we create a connection to the RabbitMQ server running locally and subscribe to the 'hello' queue. When messages are received from that queue, the callback function is called which prints the message body.
app.py
Here, we are going to create the server to publish messages to the "hello" queue.
from robyn import Robyn, status_codesfrom robyn.robyn import Responseimport asyncioimport pikaapp = Robyn( __file__ )@app.get("/")async def hello(): body = "Hello World!" await sender(body) return Response(status_code=status_codes.HTTP_200_OK, headers={}, body="Hello, World!")async def sender(body: str): loop = asyncio.get_event_loop() connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop) channel = await connection.channel() await channel.default_exchange.publish(aio_pika.Message(body=body.encode()), routing_key='hello') print(" [x] Sent 'Hello'") await connection.close()app.start(port=8000, url="0.0.0.0")
We create thesender()
function that creates a connection to the RabbitMQ server, then we create a channel and declare the queue where we want to publish the messages.
Then, we send to the queue the body that thesender()
function receives.
We start the server and navigate or make a request tolocalhost:8000/
. We should see the "[x] Sent 'Hello' " message in the terminal.
In the terminal of thereceiver.py
file we run the program, we should see the following message:
Conclusion
In conclusion, using RabbitMQ as a message broker, you can decouple your different components and scale them independently. This way we can build Robyn microservices that communicate with each other.
If you have any recommendations about other packages, architectures, how to improve my code, my English, or anything; please leave a comment or contact me throughTwitter, orLinkedIn.
The source code ishere.
Resources
Top comments(0)
For further actions, you may consider blocking this person and/orreporting abuse