import pika import time class RabbitMQPublisher: def __init__(self, queue_name): self.queue_name = queue_name self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.queue_name) def publish_messages(self, num_messages): for i in range(num_messages): message = f'Message {i+1}' self.channel.basic_publish(exchange='', routing_key=self.queue_name, body=message) print(f'Sent: {message}') def close_connection(self): self.connection.close() class RabbitMQConsumer: def __init__(self, queue_name): self.queue_name = queue_name self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.queue_name) def callback(self, ch, method, properties, body): print(f'Received: {body.decode()}') ch.basic_ack(delivery_tag=method.delivery_tag) self.pull_remaining_messages() def pull_remaining_messages(self): method_frame, header_frame, body = self.channel.basic_get(queue=self.queue_name, auto_ack=False) print(f"method_frame = {method_frame}") while method_frame: print(f'---> [pull_remaining_messages] Received: {body.decode()}') self.channel.basic_ack(delivery_tag=method_frame.delivery_tag) method_frame, header_frame, body = self.channel.basic_get(queue=self.queue_name, auto_ack=False) def start_consuming(self): self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback, auto_ack=False) print('Waiting for messages. To exit press CTRL+C') self.channel.start_consuming() if __name__ == "__main__": publisher = RabbitMQPublisher('test_queue') publisher.publish_messages(100) publisher.close_connection() time.sleep(2) consumer = RabbitMQConsumer('test_queue') consumer.start_consuming()