- Notifications
You must be signed in to change notification settings - Fork19
A Python asyncio-based client for RabbitMQ Streams
License
rabbitmq-community/rstream
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
A Python asyncio-based client forRabbitMQ Streams
The RabbitMQ stream plug-in is required. See thedocumentation for enabling it.
- Installation
- Examples
- Client Codecs
- Publishing messages
- Sub-Entry Batching and Compression
- Deduplication
- Consuming messages
- Super Streams
- Single Active Consumer
- Connecting with SSL
- Sasl Mechanisms
- Managing disconnections
- Load Balancer
- Client Performances
- Build and Test
- Project Notes
The RabbitMQ stream plug-in is required. See thedocumentation for enabling it.
The client is distributed viaPIP:
pip install rstream
Here you can find different examples.
Before start using the client is important to read this section.The client supports two codecs to store the messages to the server:
AMQP 1.0Binary
By default you should useAMQP 1.0 codec:
amqp_message=AMQPMessage(body=bytes("hello: {}".format(i),"utf-8"), )
You need to use theAMQP 1.0 codec to exchange messages with other stream clients likeJava,.NET,Rust,Go or if you want to use theAMQP 0.9.1 clients.
You can use theBinary version if you need to exchange messages from Python to Python.
Note: The messages stored inBinary are not compatible with the other clients and with AMQP 0.9.1 clients.
Once the messages are stored to the server, you can't change them.
Read also theClient Performances section
You can publish messages with four different methods:
send: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires.send_batch: synchronous, the user buffers the messages and sends them. This is the fastest publishing method.send_wait: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method.send_sub_entry: asynchronous. SeeSub-entry batching and compression.
On theexamples directory you can find diffent way to send the messages:
The Send method takes as parameter an handle function that will be called asynchronously when the message sent will be notified from the server to have been published.
Example:
Withsend_wait instead will wait until the confirmation from the server is received.
RabbitMQ Stream provides a special mode to publish, store, and dispatch messages: sub-entry batching. This mode increases throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled. It also allows using compression to reduce bandwidth and storage if messages are reasonably similar, at the cost of increasing CPU usage on the client side.
Sub-entry batching consists in squeezing several messages – a batch – in the slot that is usually used for one message. This means outbound messages are not only batched in publishing frames, but in sub-entries as well.
# sending with compressionawaitproducer.send_sub_entry(STREAM,compression_type=CompressionType.Gzip,sub_entry_messages=messages )
Full example producer using sub-entry batch
Consumer side is automatic, so no need configurations.
The client is shipped with No Compression (CompressionType.No) and Gzip Compression (CompressionType.Gzip) the other compressions (Snappy,Lz4,Zstd) can be used implementing theICompressionCodec class.
RabbitMQ Stream can detect and filter out duplicated messages, based on 2 client-side elements: the producer name and the message publishing ID.All the producer methods to send messages (send, send_batch, send_wait) takes a publisher_name parameter while the message publishing id can be set in the AMQP message.
Example:
Seeconsumer examples for basic consumer and consumers with different offsets.
RabbitMQ Streams provides server-side offset tracking for consumers. This features allows a consuming application to restart consuming where it left off in a previous run.You can use the store_offset (to store an offset in the server) and query_offset (to query it) methods of the consumer class like in this example:
A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.
See theblog post for more info.
You can usesuperstream_producer andsuperstream_consumer classes which internally uses producers and consumers to operate on the componsing streams.
See theSuper Stream example
Single active consumer provides exclusive consumption and consumption continuity on a stream.
See theblog post for more info.See examples in:
See thesingle active consumer example
Filtering is a new streaming feature enabled from RabbitMQ 3.13 based on Bloom filter.RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filteringonly on the client side. This helps to save network bandwidth when a consuming application needs only a subset ofmessages.
https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#filtering
See thefiltering examples
You can enable ssl/tls.See example here:tls example
You can use the following sasl mechanisms:
- PLAIN
- EXTERNAL
The client usesPLAIN mechanism by default.
TheEXTERNAL mechanism is used to authenticate a user based on a certificate presented by the client.Example:
ssl_context=ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)# put the root certificate of the cassl_context.load_verify_locations("certs/ca_certificate.pem")ssl_context.load_cert_chain("certs/client_HOSTNAME_certificate.pem","certs/client_HOSTNAME_key.pem", )asyncwithProducer("HOSTNAME",username="not_important",password="not_important",port=5551,ssl_context=ssl_context,sasl_configuration_mechanism=SlasMechanism.MechanismExternal## <--- here EXTERNAL configuration
The pluginrabbitmq_auth_mechanism_ssl needs to be enabled on the server side, andssl_options.fail_if_no_peer_cert needs to set totrueconfig example:
auth_mechanisms.3 = PLAINauth_mechanisms.2 = AMQPLAINauth_mechanisms.1 = EXTERNALssl_options.cacertfile = certs/ca_certificate.pemssl_options.certfile = certs/server_certificate.pemssl_options.keyfile = certs/server_key.pemlisteners.ssl.default = 5671stream.listeners.ssl.default = 5551ssl_options.verify = verify_peerssl_options.fail_if_no_peer_cert = trueThe client supports auto-reconnect for all the entities: Producer, Consumer, SuperstreamProducer, SuperstreamConsumer.
When the TCP connection is disconnected unexpectedly the entities try to reconnect automatically.
asyncdefon_connection_closed(disconnection_info:OnClosedErrorInfo)->None:print("connection has been closed from stream: "+str(disconnection_info.streams)+" for reason: "+str(disconnection_info.reason) )consumer=Consumer(..on_close_handler=on_connection_closed,)
It is possible to configure the reconnect behaviour usingrecovery_strategy parameterBy default, the clients usesBackOffRecoveryStrategy
Example:
consumer=Consumer( ...recovery_strategy=BackOffRecoveryStrategy(), )
Please take a look at the complete, reliable client examplehere
In order to handle load balancers, you can use theload_balancer_mode parameter for producers and consumers. This will always attempt to create a connection via the load balancer, discarding connections that are inappropriate for the client type.
Producers must connect to the leader node, while consumers can connect to any, prioritising replicas if available.
The RabbitMQ Stream queues can handle high throughput. Currently, the client cannot reach the server's maximum throughput.
We found some bottlenecks; one of them is the current AMQP 1.0 marshal/unmarshal message format.
This one:
foriinrange(1_000_000):amqp_message=AMQPMessage(body=bytes("hello: {}".format(i),"utf-8"), )# send is asynchronousawaitproducer.send(stream=STREAM,message=amqp_message)
is more or less ~55% slower than:
foriinrange(1_000_000):# send is asynchronousawaitproducer.send(stream=STREAM,message=b"hello")
You can use thebatch_send to test the performance.
We are evaluating rewriting theAMQP 1.0 codec to optimise it for the stream use case.
Linux Ubuntu 4 cores and 8 GB of Ram
RabbitMQ installed to the server
Send batch with AMQP 1.0 codec:
$python3docs/examples/basic_producers/producer_send_batch.pySent1.000.000messagesin9.3218seconds.107.275,5970messagespersecond
- Send batch with binary codec:
$python3docs/examples/basic_producers/producer_send_batch_binary.pySent1.000.000messagesin2.9930seconds.334.116,5639messagespersecond
To run the tests, you need to have a running RabbitMQ Stream server.You can use the Docker official image.
Run the server with the following command:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \ rabbitmq:3.13.1-managementenable the plugin:
dockerexec rabbitmq rabbitmq-pluginsenable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0
and run the tests:
poetry run pytest
About
A Python asyncio-based client for RabbitMQ Streams
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Uh oh!
There was an error while loading.Please reload this page.
Contributors9
Uh oh!
There was an error while loading.Please reload this page.