Movatterモバイル変換


[0]ホーム

URL:


RabbitMQ Stream Java Client

version 1.3.0
(037e3fe)
Table of Contents

The RabbitMQ Stream Java Client is a Java library for communicating with theRabbitMQ Stream Plugin.Use it to create and delete streams, publish messages, and consume from streams.Learn more in theclient overview.

This library requires at least Java 11, but Java 21 or more is recommended.

Stream PerfTest is a performance testing tool based on this client library.

What is a RabbitMQ Stream?

A RabbitMQ stream is a persistent and replicated data structure that modelsanappend-only log. It differs from the classicalRabbitMQ queue in the way message consumption works. In a classical RabbitMQ queue,consuming removes messages from the queue. In a RabbitMQ stream, consuming leavesthe stream intact. So the content of a stream can be read and re-read withoutimpact or destructive effect.

A RabbitMQ stream is a persistent and replicated data structure that models anappend-only log.It differs from traditional RabbitMQ queues in how message consumption works:

  • Traditional queue: Consuming removes messages from the queue

  • Stream: Consuming leaves messages intact for future reads

This allows stream content to be read and re-read multiple times without any destructive effects.

Neither streams nor traditional queues are inherently better — they serve different use cases.

When to Use RabbitMQ Stream?

RabbitMQ Stream was developed to cover the following messaging use cases:

  • Large fan-outs: Multiple consumer applications need to read the same messages

  • Replay/Time-traveling: Applications need to read message history or resume from a specific point

  • High throughput: Higher performance is required compared to other protocols (AMQP, STOMP, MQTT)

  • Large logs: Large amounts of data must be stored with minimal memory overhead

Other Way to Use Streams in RabbitMQ

You can also use streams in RabbitMQ with any protocol RabbitMQ supports (AMQP, MQTT, STOMP).Instead of using the stream protocol directly, you consume from "stream-powered" queues using e.g. AMQP.These special queues are backed by stream infrastructure and provide stream semantics (primarily non-destructive reading).

Stream-powered queues offer stream features (append-only structure, non-destructive reading) while still using your protocol of choice.

But by using another protocol than the stream protocol, one may not benefit from the performance it provides, as it has been designed with high throughput in mind.

Guarantees

RabbitMQ stream provides at-least-once guarantees thanks to the publisher confirm mechanism.

Messagededuplication is also supported on the publisher side.

Stream Client Overview

The RabbitMQ Stream Java Client implements theRabbitMQ Stream protocoland avoids dealing with low-level concerns by providing high-level functionalitiesto build fast, efficient, and robust client applications.

  • Stream management: Create and delete streams directly from applications

  • Configurable throughput: Adjust publishing performance with batch size and flow control

  • Duplicate prevention: Avoid duplicate messages with built-in deduplication

  • Offset tracking: Resume consumption from where you left off with automatic or manual tracking

  • Optimized connections: Connect publishers to stream leaders and consumers to replicas

  • Resource efficiency: Automatically scale connections based on publisher/consumer count

  • Automatic recovery: Handle network failures with connection recovery and consumer re-subscription

  • Observability: Built-in support forPrometheus metrics and distributed tracing (OpenZipkin,Wavefront) viaMicrometer

Versioning

This library usessemantic versioning.

The next section provides more details about the evolution of programming interfaces.

Stability of Programming Interfaces

The client contains 2 sets of programming interfaces whose stability are of interest for application developers:

  • Application Programming Interfaces (API): Used for writing application logic.Includes interfaces and classes in thecom.rabbitmq.stream package (e.g.,Producer,Consumer,Message).These APIs form the main programming model and remain as stable as possible.New features may add methods to existing interfaces.

  • Service Provider Interfaces (SPI): Used for implementing technical client behavior, not application logic.Developers may reference these during configuration or when customizing internal client behavior.SPIs include interfaces incom.rabbitmq.stream.codec,com.rabbitmq.stream.compression,com.rabbitmq.stream.metrics, and other packages.These interfaces may change, but this typically won’t affect most applications since changes are limited to client internals.

Pre-requisites

This library requires at least Java 11, but Java 21 or more is recommended.

Setting up RabbitMQ

A RabbitMQ 3.9+ node with the stream plugin enabled is required. The easiest wayto get up and running is to use Docker.

With Docker

There are different ways to make the broker visible to the client application when runningin Docker. The next sections show a couple of options suitable for local development.

Docker on macOS

Docker runs on a virtual machine when using macOS, so do not expect high performancewhen using RabbitMQ Stream inside Docker on a Mac.

With Docker Bridge Network Driver

This section shows how to start a broker instance for local development(the broker Docker container and the client application are assumed to run on thesame host).

The following command creates a one-time Docker container to run RabbitMQ:

Running the stream plugin with Docker
docker run -it --rm --name rabbitmq -p 5552:5552 \    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \    rabbitmq:4.1

The previous command exposes only the stream port (5552), you can exposeports for other protocols:

Exposing the AMQP 0.9.1 and management ports:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \    rabbitmq:4.1-management

Refer to the officialRabbitMQ Docker image web pageto find out more about its usage.

Once the container is started,the stream plugin must be enabled:

Enabling the stream plugin:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

With Docker Host Network Driver

This is the simplest way to run the broker locally.The container uses thehost network,this is perfect for experimenting locally.

Running RabbitMQ Stream with the host network driver
docker run -it --rm --name rabbitmq --network host rabbitmq:4.1

Once the container is started,the stream plugin must be enabled:

Enabling the stream plugin:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

The container will use the following ports: 5552 (for stream) and 5672 (for AMQP.)

Docker Host Network Driver Support

The host networking driveronly works on Linux hosts.

With a RabbitMQ Package Running on the Host

Using a package implies installing Erlang.

Refer to thestream plugin documentation for more information on configuration.

Dependencies

Use your favorite build management tool to add the client dependencies to your project.

Maven

pom.xml
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>stream-client</artifactId><version>1.3.0</version></dependency></dependencies>

Snapshots require to declare theappropriate repository.

Gradle

build.gradle
dependencies {  compile"com.rabbitmq:stream-client:1.3.0"}

Snapshots require to declare theappropriate repository.

Snapshots

Releases are available from Maven Central, which does not require specific declaration.Snapshots are available from a repository which must be declared in the dependency management configuration.

With Maven:

Snapshot repository declaration for Maven
<repositories><repository><id>central-portal-snapshots</id><url>https://central.sonatype.com/repository/maven-snapshots/</url><snapshots><enabled>true</enabled></snapshots><releases><enabled>false</enabled></releases></repository></repositories>

With Gradle:

Snapshot repository declaration for Gradle:
repositories {  maven {    name ='Central Portal Snapshots'    url ='https://central.sonatype.com/repository/maven-snapshots/'// Only search this repository for the specific dependency    content {      includeModule("com.rabbitmq","stream-client")    }  }  mavenCentral()}

Sample Application

This section covers the basics of the RabbitMQ Stream Java API by buildinga small publish/consume application. This is a good way to getan overview of the API. If you want a more comprehensive introduction,you can go to thereference documentation section.

The sample application publishes some messages and then registersa consumer to make some computations out of them. Thesource code is available on GitHub.

The sample class starts with a few imports:

Imports for the sample application
importcom.rabbitmq.stream.Consumer;importcom.rabbitmq.stream.Environment;importcom.rabbitmq.stream.OffsetSpecification;importcom.rabbitmq.stream.Producer;publicclassSampleApplication {publicstaticvoid main(String[] args)throwsException {System.out.println("Connecting...");        Environment environment = Environment.builder().build();(1)String stream =UUID.randomUUID().toString();        environment.streamCreator().stream(stream).create();(2)System.out.println("Starting publishing...");int messageCount =10000;CountDownLatch publishConfirmLatch =newCountDownLatch(messageCount);        Producer producer = environment.producerBuilder()(1)                .stream(stream)                .build();        IntStream.range(0, messageCount)                .forEach(i -> producer.send((2)                        producer.messageBuilder()(3)                            .addData(String.valueOf(i).getBytes())(3)                            .build(),(3)                        confirmationStatus -> publishConfirmLatch.countDown()(4)                ));        publishConfirmLatch.await(10,TimeUnit.SECONDS);(5)        producer.close();(6)System.out.printf("Published %,d messages%n", messageCount);System.out.println("Starting consuming...");AtomicLong sum =newAtomicLong(0);CountDownLatch consumeLatch =newCountDownLatch(messageCount);        Consumer consumer = environment.consumerBuilder()(1)                .stream(stream)                .offset(OffsetSpecification.first())(2)                .messageHandler((offset, message) -> {(3)                    sum.addAndGet(Long.parseLong(newString(message.getBodyAsBinary())));(4)                    consumeLatch.countDown();(5)                })                .build();        consumeLatch.await(10,TimeUnit.SECONDS);(6)System.out.println("Sum:" + sum.get());(7)        consumer.close();(8)        environment.deleteStream(stream);(1)        environment.close();(2)    }}

The next step is to create theEnvironment. It is a management objectused to manage streams and create producers as well as consumers. Thenext snippet shows how to create anEnvironment instance andcreate the stream used in the application:

Creating the environment
System.out.println("Connecting...");Environment environment = Environment.builder().build();(1)String stream =UUID.randomUUID().toString();environment.streamCreator().stream(stream).create();(2)
1UseEnvironment#builder to create the environment
2Create the stream

Then comes the publishing part. The next snippet shows how to createaProducer, send messages, and handle publishing confirmations, tomake sure the broker has taken outbound messages into account.The application uses a count down latch to move on once the messageshave been confirmed.

Publishing messages
System.out.println("Starting publishing...");int messageCount =10000;CountDownLatch publishConfirmLatch =newCountDownLatch(messageCount);Producer producer = environment.producerBuilder()(1)        .stream(stream)        .build();IntStream.range(0, messageCount)        .forEach(i -> producer.send((2)                producer.messageBuilder()(3)                    .addData(String.valueOf(i).getBytes())(3)                    .build(),(3)                confirmationStatus -> publishConfirmLatch.countDown()(4)        ));publishConfirmLatch.await(10,TimeUnit.SECONDS);(5)producer.close();(6)System.out.printf("Published %,d messages%n", messageCount);
1Create theProducer withEnvironment#producerBuilder
2Send messages withProducer#send(Message, ConfirmationHandler)
3Create a message withProducer#messageBuilder
4Count down on message publishing confirmation
5Wait for all publishing confirmations to have arrived
6Close the producer

It is now time to consume the messages. TheEnvironment lets us create aConsumerand provide some logic on each incoming message by implementing aMessageHandler.The next snippet does this to calculate a sum and output it once all the messageshave been received:

Consuming messages
System.out.println("Starting consuming...");AtomicLong sum =newAtomicLong(0);CountDownLatch consumeLatch =newCountDownLatch(messageCount);Consumer consumer = environment.consumerBuilder()(1)        .stream(stream)        .offset(OffsetSpecification.first())(2)        .messageHandler((offset, message) -> {(3)            sum.addAndGet(Long.parseLong(newString(message.getBodyAsBinary())));(4)            consumeLatch.countDown();(5)        })        .build();consumeLatch.await(10,TimeUnit.SECONDS);(6)System.out.println("Sum:" + sum.get());(7)consumer.close();(8)
1Create theConsumer withEnvironment#consumerBuilder
2Start consuming from the beginning of the stream
3Set up the logic to handle messages
4Add the value in the message body to the sum
5Count down on each message
6Wait for all messages to have arrived
7Output the sum
8Close the consumer

The application has some cleaning to do before terminating, that isdeleting the stream and closing the environment:

Cleaning before terminating
environment.deleteStream(stream);(1)environment.close();(2)
1Delete the stream
2Close the environment

You can run the sample application from the root of the project (you needa running local RabbitMQ node with the stream plugin enabled):

$ ./mvnw -q test-compile exec:java -Dexec.classpathScope="test" \    -Dexec.mainClass="com.rabbitmq.stream.docs.SampleApplication"Starting publishing...Published 10000 messagesStarting consuming...Sum: 49995000

You can remove the-q flag if you want more insight on the execution of the build.

RabbitMQ Stream Java API

Overview

This section covers the API for connecting to the RabbitMQ Stream Plugin and working with messages.The API provides three main interfaces:

  • com.rabbitmq.stream.Environment for connecting to a node and optionallymanaging streams.

  • com.rabbitmq.stream.Producer to publish messages.

  • com.rabbitmq.stream.Consumer to consume messages.

Environment

Creating the Environment

The environment serves as the main entry point to a node or a cluster of nodes.Producer andConsumer instances are created from anEnvironment instance.Here is the simplest way to create anEnvironment instance:

Creating an environment with all the defaults
Environment environment = Environment.builder().build();(1)// ...environment.close();(2)
1Create an environment that will connect to localhost:5552
2Close the environment after usage

Always close the environment to release resources when finished.

Treat the environment as a long-lived object.An application will usually create oneEnvironment instance when it starts up and close it when it exits.

It is possible to use a URI to specify all the necessary information toconnect to a node:

Creating an environment with a URI
Environment environment = Environment.builder()        .uri("rabbitmq-stream://guest:guest@localhost:5552/%2f")(1)        .build();
1Use theuri method to specify the URI to connect to

The previous snippet uses a URI that specifies the following information: host, port,username, password, and virtual host (/, which is encoded as%2f).The URI follows the same rules as theAMQP 0.9.1 URI,except the protocol must berabbitmq-stream.TLS is enabled by using therabbitmq-stream+tls scheme in the URI.

When using one URI, the corresponding node will be the main entry point to connect to.TheEnvironment will then use the stream protocol to find out more about streams topology (leaders and replicas) when asked to createProducer andConsumer instances.

If this node fails, theEnvironment will lose connectivity.To improve resilience, specify multiple URIs as fallback options:

Creating an environment with several URIs
Environment environment = Environment.builder()        .uris(Arrays.asList((1)"rabbitmq-stream://host1:5552","rabbitmq-stream://host2:5552","rabbitmq-stream://host3:5552")        )        .build();
1Use theuris method to specify several URIs

By specifying several URIs, the environment will try to connect to the first one, andwill pick a new URI randomly in case of disconnection.

Understanding Connection Logic

Creating the environment to connect to a cluster node usually works seamlessly.Creating publishers and consumers may encounter connection issues because the client relies on cluster hints to locate stream leaders and replicas.

These connection hints can be accurate or less appropriate depending on the infrastructure.If you encounter connection problems (such as unresolvable hostnames), thisblog post explains the root causes and solutions.Setting theadvertised_host andadvertised_portconfiguration entries should solve the most common connection problems.

To make the local development experience simple, the client library can choose to always uselocalhost for producers and consumers.This happens if the following conditions are met: the initial host to connect to islocalhost, the user isguest, and no custom address resolver has been provided.Provide a pass-throughAddressResolver toEnvironmentBuilder#addressResolver(AddressResolver) to avoid this behavior.It is unlikely this behavior applies for any real-world deployment, wherelocalhost and/or the defaultguest user should not be used.

Enabling TLS

TLS can be enabled by using therabbitmq-stream+tls scheme in the URI.The default TLS port is 5551.

Use theEnvironmentBuilder#tls method to configure TLS.The most important setting is aio.netty.handler.ssl.SslContext instance, which is created and configured with theio.netty.handler.ssl.SslContext#forClient method.Note hostname verification is enabled by default.

The following snippet shows a common configuration, whereby the client is instructed to trust servers with certificates signed by the configured certificate authority (CA).

Creating an environment that uses TLS
X509Certificate certificate;try (FileInputStream inputStream =newFileInputStream("/path/to/ca_certificate.pem")) {CertificateFactory fact =CertificateFactory.getInstance("X.509");    certificate = (X509Certificate) fact.generateCertificate(inputStream);(1)}SslContext sslContext = SslContextBuilder    .forClient()    .trustManager(certificate)(2)    .build();Environment environment = Environment.builder()    .uri("rabbitmq-stream+tls://guest:guest@localhost:5551/%2f")(3)    .tls().sslContext(sslContext)(4)    .environmentBuilder()    .build();
1Load certificate authority (CA) certificate from PEM file
2Configure NettySslContext to trust CA certificate
3Use TLS scheme in environment URI
4SetSslContext in environment configuration

Checking the identity of the server the client connects to is an important part of the TLS handshake.To make this work with the stream client library, it is critical the configured trusted certificates match the hosts returned by cluster nodes in the connection hints.Make sure to read the section onconnection logic.You may have to configure theadvertised_hostbroker setting in case of a mismatch between trusted certificates and the default connection hints cluster nodes return.

It is sometimes handy to trust any server certificates in development environments.EnvironmentBuilder#tls provides thetrustEverything method to do so.This should not be used in a production environment.

Creating a TLS environment that trusts all server certificates for development
Environment environment = Environment.builder()    .uri("rabbitmq-stream+tls://guest:guest@localhost:5551/%2f")    .tls().trustEverything()(1)    .environmentBuilder()    .build();
1Trust all server certificates

Configuring the Environment

The following table sums up the main settings to create anEnvironment:

Parameter NameDescriptionDefault

uri

The URI of the node to connect to (single node).

rabbitmq-stream://guest:guest@localhost:5552/%2f

uris

The URI of the nodes to try to connect to (cluster).

rabbitmq-stream://guest:guest@localhost:5552/%2f singleton list

host

Host to connect to.

localhost

port

Port to use.

5552

username

Username to use to connect.

guest

password

Password to use to connect.

guest

virtualHost

Virtual host to connect to.

/

rpcTimeout

Timeout for RPC calls.

Duration.ofSeconds(10)

recoveryBackOffDelayPolicy

Delay policy to use for backoff on connection recovery.

Fixed delay of 5 seconds

topologyUpdateBackOffDelayPolicy

Delay policy to use for backoff on topology update, e.g.when a stream replica moves and a consumer needs to connect to anothernode.

Initial delay of 5 seconds then delay of 1 second.

scheduledExecutorService

Executor used to schedule infrastructure tasks like background publishing, producersand consumers migration after disconnection or topology update. If a custom executor is provided,it is the developer’s responsibility to close it once it is no longer necessary.

Executors  .newScheduledThreadPool(Runtime      .getRuntime()      .availableProcessors());

maxProducersByConnection

The maximum number ofProducer instances a single connection can maintain beforea new connection is open.The value must be between 1 and 256.The limit may not be strictly enforced in case of too many concurrent creations.

256

maxTrackingConsumersByConnection

The maximum number ofConsumer instances that store their offset a single connectioncan maintain before a new connection is open.The value must be between 1 and 256.The limit may not be strictly enforced in case of too many concurrent creations.

50

maxConsumersByConnection

The maximum number ofConsumer instances a single connection can maintain beforea new connection is open.The value must be between 1 and 256.The limit may not be strictly enforced in case of too many concurrent creations.

256

lazyInitialization

To delay the connection opening until necessary.

false

requestedHeartbeat

Heartbeat requested by the client.

60 seconds

forceReplicaForConsumers

Retry connecting until a replica is available for consumers.The client retries 5 times before falling back to the stream leader node.Set totrue only for clustered environments, not for 1-node environments, where only the stream leader is available.

false

forceLeaderForProducers

Force connecting to a stream leader for producers.Set tofalse if it acceptable to stay connected to a stream replica when a load balancer is in use.

true

id

Informational ID for the environment instance.Used as a prefix for connection names.

rabbitmq-stream

addressResolver

Contract to change resolved node address to connect to.

Pass-through (no-op)

locatorConnectionCount

Number of locator connections to maintain (for metadata search)

The smaller of the number of URIs and 3.

tls

Configuration helper for TLS.

TLS is enabled if arabbitmq-stream+tls URI is provided.

tls#sslContext

Set theio.netty.handler.ssl.SslContext used for the TLS connection.Useio.netty.handler.ssl.SslContextBuilder#forClient to configure it.The server certificate chain, the client private key, and hostname verification are the usual elements that need to be configured.

The JDK trust manager and no client private key.

tls#trustEverything

Helper to configure aSslContext that trusts all server certificatesand does not use a client private key.Only for development.

Disabled by default.

netty

Configuration helper for Netty.

netty#eventLoopGroup

Netty’s event dispatcher.It is the developer’s responsibility to close theEventLoopGroup they provide.

NioEventLoopGroup instance closed automatically with theEnvironment instance.

netty#ByteBufAllocator

ByteBuf allocator.

PooledByteBufAllocator.DEFAULT

netty#channelCustomizer

Extension point to customize Netty’sChannel instances used for connections.

None

netty#bootstrapCustomizer

Extension point to customize Netty’sBootstrap instances used to configure connections.

None

When a Load Balancer is in Use

A load balancer can misguide the client when it tries to connect to nodes that host stream leaders and replicas.The"Connecting to Streams" blog post covers why client applications must connect to the appropriate nodes in a cluster and how aload balancer can make things complicated for them.

TheEnvironmentBuilder#addressResolver(AddressResolver) method allows intercepting the node resolution after metadata hints and before connection.Applications can use this hook to ignore metadata hints and always use the load balancer, as illustrated in the following snippet:

Using a custom address resolver to always use a load balancer
Address entryPoint =new Address("my-load-balancer",5552);(1)Environment environment = Environment.builder()    .host(entryPoint.host())(2)    .port(entryPoint.port())(2)    .addressResolver(address -> entryPoint)(3)    .locatorConnectionCount(3)(4)    .build();
1Set the load balancer address
2Use load balancer address for initial connection
3Ignore metadata hints, always use load balancer
4Set the number of locator connections to maintain

Note the example above sets the number of locator connections the environment maintains.Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to).The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment.This is why it is recommended to set the value explicitly, 3 being a good default.

Managing Streams

Streams are usually long-lived, centrally-managed entities, that is, applicationsare not supposed to create and delete them. It is nevertheless possible to create anddelete stream with theEnvironment. This comes in handy for development and testingpurposes.

Streams are created with theEnvironment#streamCreator() method:

Creating a stream
environment.streamCreator().stream("my-stream").create();(1)
1Create themy-stream stream

StreamCreator#create is idempotent: trying to re-create a stream with the same nameand same properties (e.g. maximum size, see below) will not throw an exception. In other words,you can be sure the stream has been created onceStreamCreator#create returns. Note it isnot possible to create a stream with the same name as an existing stream but with differentproperties. Such a request will result in an exception.

Streams can be deleted with theEnvironment#delete(String) method:

Deleting a stream
environment.deleteStream("my-stream");(1)
1Delete themy-stream stream

Note you should avoid stream churn (creating and deleting streams repetitively)as their creation and deletion imply some significant housekeeping onthe server side (interactions with the file system, communication between nodes of the cluster).

It is also possible to limit the size of a streamwhen creating it. A streamis an append-only data structure and reading from it does not remove data.This means a stream can grow indefinitely. RabbitMQ Stream supports asize-based and time-based retention policies: once the stream reaches a given sizeor a given age, it is truncated (starting from the beginning).

Limit the size of streams if appropriate!

Make sure to set up a retention policy on potentially large streamsif you don’t want to saturate the storage devices of your servers. Keepin mind that this means some data will be erased!

It is possible to set up the retention policy when creating the stream:

Setting the retention policy when creating a stream
environment.streamCreator()        .stream("my-stream")        .maxLengthBytes(ByteCapacity.GB(10))(1)        .maxSegmentSizeBytes(ByteCapacity.MB(500))(2)        .create();
1Set the maximum size to 10 GB
2Set the segment size to 500 MB

The previous snippet mentions a segment size. RabbitMQ Stream does not store a streamin a big, single file, it uses segment files for technical reasons.A stream is truncated by deleting whole segment files (and not part of them)sothe maximum size of a stream is usually significantly higher than the size ofsegment files. 500 MB is a reasonable segment file size to begin with.

When does the broker enforce the retention policy?

The broker enforces the retention policy when the segments of a streamroll over, that is when the current segment has reached its maximumsize and is closed in favor of a new one. This means the maximum segmentsize is a critical setting in the retention mechanism.

RabbitMQ Stream also supports a time-based retention policy: segments get truncatedwhen they reach a certain age. The following snippetillustrates how to set the time-based retention policy:

Setting a time-based retention policy when creating a stream
environment.streamCreator()        .stream("my-stream")        .maxAge(Duration.ofHours(6))(1)        .maxSegmentSizeBytes(ByteCapacity.MB(500))(2)        .create();
1Set the maximum age to 6 hours
2Set the segment size to 500 MB

Producer

Creating a Producer

AProducer instance is created from theEnvironment. The only mandatorysetting to specify is the stream to publish to:

Creating a producer from the environment
Producer producer = environment.producerBuilder()(1)        .stream("my-stream")(2)        .build();(3)// ...producer.close();(4)
1UseEnvironment#producerBuilder() to define the producer
2Specify the stream to publish to
3Create the producer instance withbuild()
4Close the producer after usage

TreatProducer instances as long-lived objects.Avoid creating a producer for single-message operations.

Producer thread safety

Producer instances are thread-safe.Deduplication imposesrestrictions on the usage of threads though.

Internally, theEnvironment will query the broker to find out aboutthe topology of the stream and will create or re-use a connection topublish to the leader node of the stream.

The following table sums up the main settings to create aProducer:

Parameter NameDescriptionDefault

stream

The stream to publish to.

No default, mandatory setting.

name

The logical name of the producer. Specify a name to enablemessage deduplication.

null (no deduplication)

batchSize

The maximum number of messages to accumulate before sending them to the broker.

100

subEntrySize

The number of messages to put in a sub-entry.A sub-entry is one "slot" in a publishingframe, meaning outbound messages are not only batched in publishing frames, but in sub-entriesas well.Use this feature to increase throughput at the cost of increased latency andpotential duplicated messages even when deduplication is enabled.See thededicated section for more information.

1 (meaning no use of sub-entry batching)

compression

Compression algorithm to use when sub-entry batching is in use.See thededicated section for more information.

Compression.NONE

maxUnconfirmedMessages

The maximum number of unconfirmed outbound messages.Producer#send will startblocking when the limit is reached.

10,000

batchPublishingDelay

Period to send a batch of messages.

100 ms

dynamicBatch

Adapt batch size depending on ingress rate.

true

confirmTimeout

Time before the client calls the confirm callback to signaloutstanding unconfirmed messages timed out.

30 seconds

enqueueTimeout

Time before enqueueing of a message fail when the maximum number of unconfirmedis reached. The callback of the message will be called with a negative status.Set the value toDuration.ZERO if there should be no timeout.

10 seconds

retryOnRecovery

Whether to republish unconfirmed messages after recovery.Set tofalse to not republish unconfirmed messages and get a negativeConfirmationStatus for unconfirmed messages.

true

Sending Messages

Once aProducer has been created, it is possible to send a message withtheProducer#send(Message, ConfirmationHandler) method. The followingsnippet shows how to publish a message with a byte array payload:

Sending a message
byte[] messagePayload ="hello".getBytes(StandardCharsets.UTF_8);(1)producer.send(        producer.messageBuilder().addData(messagePayload).build(),(2)        confirmationStatus -> {(3)if (confirmationStatus.isConfirmed()) {// the message made it to the broker            }else {// the message did not make it to the broker            }        });
1The payload of a message is an array of bytes
2Create the message withProducer#messageBuilder()
3Define the behavior on publish confirmation

Messages are not only made of abyte[] payload, as shown inthe next section they can also carry pre-defined and application properties.

Use aMessageBuilder instance only once

AMessageBuilder instance is meant to create only one message. Youneed to create a new instance ofMessageBuilder for every messageyou want to create.

TheConfirmationHandler defines an asynchronous callback invoked when the broker confirms message receipt.TheConfirmationHandler is the place for any logic on publishing confirmation, including re-publishing the message if it is negatively acknowledged.

Keep the confirmation callback as short as possible

The confirmation callback should be kept as short as possibleto avoid blocking the connection thread. Not doing so canmake theEnvironment,Producer,Consumer instances sluggishor even block them. Any long processing should be done ina separate thread (e.g. with an asynchronousExecutorService).

Working with Complex Messages

The publishing example above showed that messages are made ofa byte array payload, but it did not go much further. Messages in RabbitMQ Streamcan actually be more sophisticated, as they comply with theAMQP 1.0 message format.

In a nutshell, a message in RabbitMQ Stream has the following structure:

  • properties:a defined set of standard properties of the message (e.g.message ID, correlation ID, content type, etc).

  • application properties: a set of arbitrary key/value pairs.

  • body: typically an array of bytes.

  • message annotations: a set of key/value pairs (aimed at the infrastructure).

The RabbitMQ Stream Java client uses theMessage interface to abstracta message and the recommended way to createMessage instances is touse theProducer#messageBuilder() method. To publish aMessage, usetheProducer#send(Message,ConfirmationHandler):

Creating a message with properties
Message message = producer.messageBuilder()(1)        .properties()(2)            .messageId(UUID.randomUUID())            .correlationId(UUID.randomUUID())            .contentType("text/plain")        .messageBuilder()(3)            .addData("hello".getBytes(StandardCharsets.UTF_8))(4)        .build();(5)producer.send(message, confirmationStatus -> { });(6)
1Get the message builder from the producer
2Get the properties builder and set some properties
3Go back to message builder
4Set byte array payload
5Build the message instance
6Publish the message
Is RabbitMQ Stream based on AMQP 1.0?

AMQP 1.0 is a standard that definesan efficient binary peer-to-peerprotocol for transporting messages between two processes over a network.It also definesan abstract message format, with concrete standard encoding.This is only the latter part that RabbitMQ Stream uses. The AMQP 1.0 protocol is not used,only AMQP 1.0 encoded messages are wrapped into the RabbitMQ Stream binary protocol.

The actual AMQP 1.0 message encoding and decoding happen on the client side, theRabbitMQ Stream plugin stores only bytes, it has no idea that AMQP 1.0 message formatis used.

AMQP 1.0 message format was chosen because of its flexibility and its advancedtype system. It provides good interoperability, which allows streamsto be accessed as AMQP 0-9-1 queues, without data loss.

Message Deduplication

RabbitMQ Stream provides publisher confirms to avoid losing messages: oncethe broker has persisted a message it sends a confirmation for this message.But this can lead to duplicate messages: imagine the connection closesbecause of a network glitch after the message has been persisted butbeforethe confirmation reaches the producer. Once reconnected, the producer willretry to send the same message, as it never received the confirmation. So themessage will be persisted twice.

Luckily RabbitMQ Stream can detect and filter out duplicated messages, basedon 2 client-side elements: theproducer name and themessage publishing ID.

Deduplication Requirements: Single Publisher Instance and Single Thread

We’ll see below that deduplication works using a strictly increasing sequence for messages.This means messages must be published in order, so there must be onlyone publisher instance with a given name and this instance must publish messageswithin a single thread.

With several publisher instances with the same name, one instance can be "ahead" of the others for the sequence ID: if it publishes a message with sequence ID 100, any message from any instance with a lower sequence ID will be filtered out.

If there is only one publisher instance with a given name, it should publish messages in a single thread.Even if messages arecreated in order, with the proper sequence ID, they can get out of order if they are published in several threads, e.g. message 5 can bepublished before message 2.The deduplication mechanism will then filter out message 2 in this case.

You have to be very careful about the way your applications publish messages when deduplication is in use: make sure publisher instances do not share the same name and use only a single thread.If you worry about performance, note it is possible to publish hundreds of thousands of messages in a single thread with RabbitMQ Stream.

Deduplication is not guaranteed when using sub-entries batching

It is not possible to guarantee deduplication whensub-entry batching is in use.Sub-entry batching is disabled by default and it does not prevent batching messages in a single publish frame, which can already provide very high throughput.

Setting the Name of a Producer

The producer name is set when creating the producer instance, which automaticallyenables deduplication:

Naming a producer to enable message deduplication
Producer producer = environment.producerBuilder()    .name("my-app-producer")(1)    .confirmTimeout(Duration.ZERO)(2)    .stream("my-stream")    .build();
1Set a name for the producer
2Disable confirm timeout check

Thanks to the name, the broker will be able to track the messages it has persistedon a given stream for this producer. If the producer connection unexpectedly closes, itwill automatically recover and retry outstanding messages. The broker will thenfilter out messages it has already received and persisted. No more duplicates!

Why settingconfirmTimeout to 0 when using deduplication?

The point of deduplication is to avoid duplicates when retrying unconfirmed messages.But why retrying in the first place? To avoidlosing messages, that is enforcingat-least-once semantics. If the client does not stubbornly retry messages and givesup at some point, messages can be lost, which maps toat-most-once semantics. Thisis why the deduplication examples set theconfirmTimeout setting toDuration.ZERO:to disable the background task that calls the confirmation callback for outstandingmessages that time out. This way the client will do its best to retry messagesuntil they are confirmed.

A producer name must be stable and clear to a human reader. It must not be a random sequence thatchanges when the producer application is restarted. Names likeonline-shop-order oronline-shop-invoice are better names than3d235e79-047a-46a6-8c80-9d159d3e1b05.There should be only one living instance of a producer with a given name on a givenstream at the same time.

Understanding Publishing ID

The producer name is only one part of the deduplication mechanism, the other partis themessage publishing ID. If the producer has a name, the client automaticallyassigns a publishing ID to each outbound message for the producer. The publishing IDis a strictly increasing sequence, starting at 0 and incremented for each message. The defaultpublishing sequence is good enough for deduplication, but it is possible toassign a publishing ID to each message:

Using an explicit publishing ID
Message message = producer.messageBuilder()    .publishingId(1)(1)    .addData("hello".getBytes(StandardCharsets.UTF_8))    .build();producer.send(message, confirmationStatus -> { });
1Set a publishing ID on a message

There are a few rules to follow when using a custom publishing ID sequence:

  • the sequence must be strictly increasing

  • there can be gaps in the sequence (e.g. 0, 1, 2, 3, 6, 7, 9, 10, etc)

  • the sequence does not have to start at 0, as long as it is increasing

A custom publishing ID sequence has usually a meaning: it can be the line number of a fileor the primary key in a database.

Note the publishing ID is not part of the message: it is not stored with the messageand so is not available when consuming the message. It is still possible to storethe value in the AMQP 1.0 message application properties or in an appropriateproperties (e.g.messageId).

Do not mix client-assigned and custom publishing ID

As soon as a producer name is set, message deduplication is enabled.It is then possible to let the producer assign a publishing ID to eachmessage or assign custom publishing IDs.Do one or the other, not both!

Restarting a Producer Where It Left Off

Using a custom publishing sequence is even more useful to restart a producer where it leftoff. Imagine a scenario whereby the producer is sending a message for each line in a file andthe application uses the line number as the publishing ID. If the application restartsbecause of some necessary maintenance or even a crash, the producer can restartfrom the beginning of the file: there would no duplicate messages because the producerhas a name and the application sets publishing IDs appropriately. Nevertheless,this is far from ideal, it would be much better to restart just after the last linethe broker successfully confirmed. Fortunately this is possible thanks totheProducer#getLastPublishing() method, which returns the last publishing ID for a givenproducer. As the publishing ID in this case is the line number, the application caneasily scroll to the next line and restart publishing from there.

The next snippet illustrates the use ofProducer#getLastPublishing():

Setting a producer where it left off
Producer producer = environment.producerBuilder()    .name("my-app-producer")(1)    .confirmTimeout(Duration.ZERO)(2)    .stream("my-stream")    .build();long nextPublishingId = producer.getLastPublishingId() +1;(3)while (moreContent(nextPublishingId)) {byte[] content = getContent(nextPublishingId);(4)    Message message = producer.messageBuilder()        .publishingId(nextPublishingId)(5)        .addData(content)        .build();    producer.send(message, confirmationStatus -> {});    nextPublishingId++;}
1Set a name for the producer
2Disable confirm timeout check
3Query last publishing ID for this producer and increment it
4Scroll to the content for the next publishing ID
5Set the message publishing

Sub-Entry Batching and Compression

RabbitMQ Stream provides a special mode to publish, store, and dispatch messages: sub-entry batching.This mode increases throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled.It also allows using compression to reduce bandwidth and storage if messages are reasonably similar, at the cost of increasing CPU usage on the client side.

Sub-entry batching consists in squeezing several messages – a batch – in the slot that is usually used for one message.This means outbound messages are not only batched in publishing frames, but in sub-entries as well.

You can enable sub-entry batching by setting theProducerBuilder#subEntrySize parameter to a value greater than 1, like in the following snippet:

Enabling sub-entry batching
Producer producer = environment.producerBuilder()    .stream("my-stream")    .batchSize(100)(1)    .subEntrySize(10)(2)    .build();
1Set batch size to 100 (the default)
2Set sub-entry size to 10

Reasonable values for the sub-entry size usually go from 10 to a few dozens.

A sub-entry batch will go directly to disc after it reached the broker, so the publishing client has complete control over it.This is the occasion to take advantage of the similarity of messages and compress them.There is no compression by default but you can choose among several algorithms with theProducerBuilder#compression(Compression) method:

Enabling compression of sub-entry messages
Producer producer = environment.producerBuilder()    .stream("my-stream")    .batchSize(100)(1)    .subEntrySize(10)(2)    .compression(Compression.ZSTD)(3)    .build();
1Set batch size to 100 (the default)
2Set sub-entry size to 10
3Use the Zstandard compression algorithm

Note the messages in a sub-entry are compressed altogether to benefit from their potential similarity, not one by one.

The following table lists the supported algorithms, general information about them, and the respective implementations used by default.

AlgorithmOverviewImplementation used

gzip

Has a high compression ratio but is slow compared to other algorithms.

JDK implementation

Snappy

Aims for reasonable compression ratio and very high speeds.

Xerial Snappy (framed)

LZ4

Aims for good trade-off between speed and compression ratio.

LZ4 Java (framed)

zstd (Zstandard)

Aims for high compression ratio and high speed, especially for decompression.

zstd-jni

You are encouraged to test and evaluate the compression algorithms depending on your needs.

The compression libraries are pluggable thanks to theEnvironmentBuilder#compressionCodecFactory(CompressionCodecFactory) method.

Consumers, sub-entry batching, and compression

There is no configuration required for consumers with regard to sub-entry batching and compression.The broker dispatches messages to client libraries: they are supposed to figure out the format of messages, extract them from their sub-entry, and decompress them if necessary.So when you set up sub-entry batching and compression in your publishers, the consuming applications must use client libraries that support this mode, which is the case for the stream Java client.

Consumer

Consumer is the API to consume messages from a stream.

Creating a Consumer

AConsumer instance is created withEnvironment#consumerBuilder(). The mainsettings are the stream to consume from, the place in the stream to startconsuming from (theoffset), and a callback when a message is received(theMessageHandler). The next snippet shows how to create aConsumer:

Creating a consumer
Consumer consumer = environment.consumerBuilder()(1)        .stream("my-stream")(2)        .offset(OffsetSpecification.first())(3)        .messageHandler((offset, message) -> {            message.getBodyAsBinary();(4)        })        .build();(5)// ...consumer.close();(6)
1UseEnvironment#consumerBuilder() to define the consumer
2Specify the stream to consume from
3Specify where to start consuming from
4Define behavior on message consumption
5Build the consumer
6Close consumer after usage

The broker starts sending messages as soon as theConsumer instance is created.

The message processing callback can take its time, but not too much

The message processing callback should not take too long or it could impact other consumers sharing the same connection.TheEnvironmentBuilder#maxConsumersByConnection(int) method allows isolating consumers from each other, at the cost of creating and maintaining more connections.Consider using a separate thread for long processing (e.g. with an asynchronousExecutorService).Note message processing callbacks run in a dedicated thread, they do not impact other network frames, which run in their own thread.

The following table sums up the main settings to create aConsumer:

Parameter NameDescriptionDefault

stream

The stream to consume from.

No default, mandatory setting.

offset

The offset to start consuming from.

OffsetSpecification#next()

messageHandler

The callback for inbound messages.

No default, mandatory setting.

name

The consumer name (foroffset tracking.)

null (no offset tracking)

AutoTrackingStrategy

Enable and configure theauto-tracking strategy.

This is the default tracking strategy if a consumername is provided.

AutoTrackingStrategy#messageCountBeforeStorage

Number of messages before storing.

10,000

AutoTrackingStrategy#flushInterval

Interval to check and store the last received offset in case of inactivity.

Duration.ofSeconds(5)

ManualTrackingStrategy

Enable and configure themanual tracking strategy.

Disabled by default.

ManualTrackingStrategy#checkInterval

Interval to check if the last requested stored offset has been actually stored.

Duration.ofSeconds(5)

noTrackingStrategy

Disable server-side offset tracking even if a name is provided.Useful whensingle active consumer is enabled and an external store is used for offset tracking.

false

subscriptionListener

Acallback before the subscription is created.Useful when using an external store for offset tracking.

null

flow

Configuration helper for flow control.

flow#initialCredits

Number of credits when the subscription is created.Increase for higher throughput at the expense of memory usage.

10

flow#strategy

TheConsumerFlowStrategy to use.

ConsumerFlowStrategy#creditOnChunkArrival(10)

Why is my consumer not consuming?

A consumer starts consuming at the very end of a stream by default (next offset).This means the consumer will receive messages as soon as a producer publishes to the stream.This also means that if no producers are currently publishing to the stream, theconsumer will stay idle, waiting for new messages to come in. Use theConsumerBuilder#offset(OffsetSpecification) to change the default behavior andsee theoffset section to find out more about the differenttypes of offset specification.

Specifying an Offset

The offset is the place in the stream where the consumer starts consuming from.The possible values for the offset parameter are the following:

  • OffsetSpecification.first(): starting from the first available offset. Ifthe stream has not beentruncated, thismeans the beginning of the stream (offset 0).

  • OffsetSpecification.last(): starting from the end of the stream and returningthe lastchunk of messages immediately (if the stream is not empty).

  • OffsetSpecification.next(): starting from the next offset to be written. ContrarytoOffsetSpecification.last(), consuming withOffsetSpecification.next()will not return anything if no-one is publishing to the stream. The broker will startsending messages to the consumer when messages are published to the stream.

  • OffsetSpecification.offset(offset): starting from the specified offset. 0 means consumingfrom the beginning of the stream (first messages). The clientcan also specify any number, for example the offset where it left offin a previous incarnation of the application.

  • OffsetSpecification.timestamp(timestamp): starting from the messages storedafter the specified timestamp.Note consumers can receive messages published a bit before the specified timestamp.Application code can filter out those messages if necessary.

What is a chunk of messages?

A chunk is simply a batch of messages. This is the storage and transportationunit used in RabbitMQ Stream, that is messages are stored contiguously in a chunkand they are delivered as part of a chunk. A chunk can be made of one to severalthousands of messages, depending on the ingress.

The following figure shows the different offset specifications in a streammade of 2 chunks:

Diagram
Figure 1. Offset specifications in a stream made of 2 chunks

Each chunk contains a timestamp of its creation time.The broker uses this timestamp to find the appropriate chunk to start from when using a timestamp specification.The broker chooses the closest chunkbefore the specified timestamp, that is why consumers may see messages published a bit before what they specified.

Tracking the Offset for a Consumer

RabbitMQ Stream provides server-side offset tracking.This means a consumer can track the offset it has reached in a stream.It allows a new incarnation of the consumer to restart consuming where it left off.All of this without an extra datastore, as the broker stores the offset tracking information.

Offset tracking works in 2 steps:

  • the consumer must have aname. The name is set withConsumerBuilder#name(String). The namecan be any value (under 256 characters) and is expected to be unique (from the applicationpoint of view). Note neither the client library, nor the brokerenforces uniqueness of the name: if 2Consumer Java instances share the same name, theiroffset tracking will likely be interleaved, which applications usually do not expect.

  • the consumer must periodicallystore the offset it has reached so far. The wayoffsets are stored depends on the tracking strategy: automatic or manual.

Whatever tracking strategy you use,a consumer must have a name to be able to store offsets.

Automatic Offset Tracking

The following snippet shows how to enable automatic tracking with the defaults:

Using automatic tracking strategy with the defaults
Consumer consumer =    environment.consumerBuilder()        .stream("my-stream")        .name("application-1")(1)        .autoTrackingStrategy()(2)        .builder()        .messageHandler((context, message) -> {// message handling code...        })        .build();
1Set the consumer name (mandatory for offset tracking)
2Use automatic tracking strategy with defaults

The automatic tracking strategy has the following available settings:

  • message count before storage: the client will store the offsetafter the specified number of messages, right after the executionof the message handler.The default is every 10,000 messages.

  • flush interval: the client will make sure to store the last received offsetat the specified interval. This avoids having pending, not stored offsets incase of inactivity.The default is 5 seconds.

Those settings are configurable, as shown in the following snippet:

Configuring the automatic tracking strategy
Consumer consumer =    environment.consumerBuilder()        .stream("my-stream")        .name("application-1")(1)        .autoTrackingStrategy()(2)            .messageCountBeforeStorage(50_000)(3)            .flushInterval(Duration.ofSeconds(10))(4)        .builder()        .messageHandler((context, message) -> {// message handling code...        })        .build();
1Set the consumer name (mandatory for offset tracking)
2Use automatic tracking strategy
3Store every 50,000 messages
4Make sure to store offset at least every 10 seconds

Note the automatic tracking is the default tracking strategy, so if you are finewith its defaults, it is enabled as soon as you specify a namefor the consumer:

Setting only the consumer name to enable automatic tracking
Consumer consumer =    environment.consumerBuilder()        .stream("my-stream")        .name("application-1")(1)        .messageHandler((context, message) -> {// message handling code...        })        .build();
1Set only the consumer name to enable automatic tracking with defaults

Automatic tracking is simple and provides good guarantees. It is neverthelesspossible to have more fine-grained control over offset tracking by usingmanual tracking.

Manual Offset Tracking

The manual tracking strategy gives the developer control of storing offsetswhenever they want, not only after a given number of messages has been receivedand supposedly processed, like automatic tracking does.

The following snippet shows how to enable manual tracking and how to storethe offset at some point:

Using manual tracking with defaults
Consumer consumer =    environment.consumerBuilder()        .stream("my-stream")        .name("application-1")(1)        .manualTrackingStrategy()(2)        .builder()        .messageHandler((context, message) -> {// message handling code...if (conditionToStore()) {            context.storeOffset();(3)          }        })        .build();
1Set the consumer name (mandatory for offset tracking)
2Use manual tracking with defaults
3Store the current offset on some condition

Manual tracking has only one setting: thecheck interval. The client checksthat the last requested stored offset has been actually stored at thespecified interval.The default check interval is 5 seconds.

The following snippet shows the configuration of manual tracking:

Configuring manual tracking strategy
Consumer consumer =    environment.consumerBuilder()        .stream("my-stream")        .name("application-1")(1)        .manualTrackingStrategy()(2)            .checkInterval(Duration.ofSeconds(10))(3)        .builder()        .messageHandler((context, message) -> {// message handling code...if (conditionToStore()) {            context.storeOffset();(4)          }        })        .build();
1Set the consumer name (mandatory for offset tracking)
2Use manual tracking with defaults
3Check last requested offset every 10 seconds
4Store the current offset on some condition

The snippet above usesMessageHandler.Context#storeOffset() to store at theoffset of the current message, but it is possible to store anywherein the stream withMessageHandler.Context#consumer()#store(long) orsimplyConsumer#store(long).

Considerations On Offset Tracking

When to store offsets? Avoid storing offsets too often or, worse, for each message.Even though offset tracking is a small and fast operation, it willmake the stream grow unnecessarily, as the broker persists offsettracking entries in the stream itself.

A good rule of thumb is to store the offset every few thousandsof messages. Of course, when the consumer restarts consuming in a new incarnation, thelast tracked offset may be a little behind the very last message the previous incarnationactually processed, so the consumer may see some messages that have been already processed.

A solution to this problem is to make sure processing is idempotent or filter out thelast duplicated messages.


Is the offset a reliable absolute value?Message offsets may not be contiguous.This means the message at offset 500 in a stream maynot be the 501 message in the stream (offsets start at 0).There can be different types of entries in a stream storage, a message isjust one of them. For example, storing an offset creates an offset trackingentry, which has its own offset.

This means one must be careful when basing some decision on offset values, likea modulo to perform an operation every X messages. As the message offsets haveno guarantee to be contiguous, the operation may not happen exactly every X messages.

Subscription Listener

The client provides aSubscriptionListener interface callback to add behavior before a subscription is created.This callback can be used to customize the offset the client library computed for the subscription.The callback is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection or a topology change).

This API isexperimental, it is subject to change.

It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides.The following code snippet shows how this can be done (note the interaction with the external store is not detailed):

Using an external store for offset tracking with a subscription listener
Consumer consumer = environment.consumerBuilder()    .stream("my-stream")    .subscriptionListener(subscriptionContext -> {(1)long offset = getOffsetFromExternalStore();(2)        subscriptionContext.offsetSpecification(OffsetSpecification.offset(offset +1));(3)    })    .messageHandler((context, message) -> {// message handling code...        storeOffsetInExternalStore(context.offset());(4)    })    .build();
1Set subscription listener
2Get offset from external store
3Set offset to use for the subscription
4Store the offset in the external store after processing

When using an external store for offset tracking, it is no longer necessary to set a name and an offset strategy, as these only apply when server-side offset tracking is in use.

Using a subscription listener can also be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex.This requires a good understanding on how and when subscription occurs in the client, and so when the subscription listener is called:

  • for a consumer with no name (server-side offset trackingdisabled)

    • on the first subscription (when the consumer is created): the offset specification is the one specified withConsumerBuilder#offset(OffsetSpecification), the default beingOffsetSpecification#next()

    • on re-subscription (after a disconnection or topology change): the offset specification is the offset of the last dispatched message

  • for a consumer with a name (server-side offset trackingenabled)

    • on the first subscription (when the consumer is created): the server-side stored offset (if any) overrides the value specified withConsumerBuilder#offset(OffsetSpecification)

    • on re-subscription (after a disconnection or topology change): the server-side stored offset is used

The subscription listener comes in handy on re-subscription.The application can track the last processed offset in-memory, with anAtomicLong for example.The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the client may not be perfectly appropriate on re-subscription.

Let’s take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval.When a glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed.Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.A customSubscriptionListener lets the application developer uses what’s best for the application if the computed value is not optimal.

Flow Control

This section covers how a consumer can tell the broker when to send more messages.

By default, the broker keeps sending messages as long as messages are processed and theMessageHandler#handle(Context, Message) method returns.This strategy works fine if message processing is fast enough.If message processing takes longer, one can be tempted to process messages in parallel with anExecutorService.This will make thehandle method return immediately and the broker will keep sending messages, potentially overflowing the consumer.

What we miss in the parallel processing case is a way to tell the library we are done processing a message and that we are ready at some point to handle more messages.This is the goal of theMessageHandler.Context#processed() method.

This method is by default a no-op because the default flow control strategy keeps asking for more messages as soon as message processing is done.This method gets some real behavior to control the flow of messages when an appropriateConsumerFlowStrategy is setConsumerBuilder#flow().The following code snippet shows how to set a handy consumer flow strategy:

Setting a consumer flow control strategy
Consumer consumer = environment.consumerBuilder()    .stream("my-stream")    .flow()        .strategy(ConsumerFlowStrategy.creditWhenHalfMessagesProcessed())(1)    .builder()    .messageHandler((context, message) -> {// message handling code (possibly asynchronous)...      context.processed();(2)    })    .build();
1Set the flow control strategy
2Make sure to callContext#processed()

In the example we set up thecreditWhenHalfMessagesProcessed strategy which asks for more messages once half of the current messages have been marked as processed.The broker does not send messages one by one, it sendschunks of messages.A chunk of messages can contain 1 to several thousands of messages.So with the strategy set above, onceprocessed() has been called for half of the messages of the current chunk, the library will ask the broker for another one (it will provide acredit for the subscription).By doing this, the next chunk should arrive by the time we are done with the other half of the current chunk.This way the consumer is neither overwhelmed nor idle.

TheConsumerFlowStrategy interface provides some static helpers to configure the appropriate strategy.

Additional notes on consumer flow control:

  • Make sure tocall theprocessed() method once you set up aConsumerFlowStrategy.The method is a no-op by default, but it is essential to call it with count-based strategies likecreditWhenHalfMessagesProcessed orcreditOnProcessedMessageCount.No calling it will stop the dispatching of messages.

  • Make sure to callprocessed() only once.Whether the method is idempotent depends on the flow strategy implementation.Apart from the default one, the implementations the library provides does not makeprocessed() idempotent.

Single Active Consumer

Single Active Consumer requiresRabbitMQ 3.11 or more.

When the single active consumer feature is enabled for several consumer instances sharing the same stream and name, only one of these instances will be active at a time and so will receive messages.The other instances will be idle.

The single active consumer feature provides 2 benefits:

  • Messages are processed in order: there is only one consumer at a time.

  • Consumption continuity is maintained: a consumer from the group will take over if the active one stops or crashes.

A typical sequence of events would be the following:

  • Several instances of the same consuming application start up.

  • Each application instance registers a single active consumer.The consumer instances share the same name.

  • The broker makes the first registered consumer the active one.

  • The active consumer receives and processes messages, the other consumer instances remain idle.

  • The active consumer stops or crashes.

  • The broker chooses the consumer next in line to become the new active one.

  • The new active consumer starts receiving messages.

The next figures illustrates this mechanism.There can be only one active consumer:

Diagram
Figure 2. The first registered consumer is active, the next ones are inactive

The broker rolls over to another consumer when the active one stops or crashes:

Diagram
Figure 3. When the active consumer stops, the next in line becomes active

Note there can be several groups of single active consumers on the same stream.What makes them different from each other is the name used by the consumers.The broker deals with them independently.Let’s use an example.Imagine 2 differentapp-1 andapp-2 applications consuming from the same stream, with 3 identical instances each.Each instance registers 1 single active consumer with the name of the application.We end up with 3app-1 consumers and 3app-2 consumers, 1 active consumer in each group, so overall 6 consumers and 2 active ones, all of this on the same stream.

Let’s see now the API for single active consumer.

Enabling Single Active Consumer

Use theConsumerBuilder#singleActiveConsumer() method to enable the feature:

Enabling single active consumer
Consumer consumer = environment.consumerBuilder()    .stream("my-stream")    .name("application-1")(1)    .singleActiveConsumer()(2)    .messageHandler((context, message) -> {// message handling code...    })    .build();
1Set the consumer name (mandatory to enable single active consumer)
2Enable single active consumer

With the configuration above, the consumer will take part in theapplication-1 group on themy-stream stream.If the consumer instance is the first in a group, it will get messages as soon as there are some available. If it is not the first in the group, it will remain idle until it is its turn to be active (likely when all the instances registered before it are gone).

Offset Tracking

Single active consumer and offset tracking work together: when the active consumer goes away, another consumer takes over and resumes when the former active left off.Well, this is how things should work and luckily this is what happens when usingserver-side offset tracking.So as long as you useautomatic offset tracking ormanual offset tracking, the handoff between a former active consumer and the new one will go well.

The story is different is you are using an external store for offset tracking.In this case you need to tell the client library where to resume from and you can do this by implementing theConsumerUpdateListener API.

Reacting to Consumer State Change

The broker notifies a consumer that becomes active before dispatching messages to it.The broker expects a response from the consumer and this response contains the offset the dispatching should start from.So this is the consumer’s responsibility to compute the appropriate offset, not the broker’s.The default behavior is to look up the last stored offset for the consumer on the stream.This works when server-side offset tracking is in use, but it does not when the application chose to use an external store for offset tracking.In this case, it is possible to use theConsumerBuilder#consumerUpdateListener(ConsumerUpdateListener) method like demonstrated in the following snippet:

Fetching the last stored offset from an external store in the consumer update listener callback
Consumer consumer = environment.consumerBuilder()    .stream("my-stream")    .name("application-1")(1)    .singleActiveConsumer()(2)    .noTrackingStrategy()(3)    .consumerUpdateListener(context -> {(4)long offset = getOffsetFromExternalStore();(5)return OffsetSpecification.offset(offset +1);(6)    })    .messageHandler((context, message) -> {// message handling code...      storeOffsetInExternalStore(context.offset());    })    .build();
1Set the consumer name (mandatory to enable single active consumer)
2Enable single active consumer
3Disable server-side offset tracking
4Set the consumer update listener
5Fetch last offset from external store
6Return the offset to resume consuming from to the broker

Super Streams (Partitioned Streams)

Super Streams requireRabbitMQ 3.11 or more.

A super stream is a logical stream composed of multiple individual streams.It provides scalability through partitioning, distributing data across several streams instead of using a single stream.

The stream Java client maintains the same programming model for super streams as individual streams.TheProducer,Consumer,Message, and other APIs remain unchanged when using super streams, so your application code requires minimal modifications.

Consuming applications can use super streams andsingle active consumer at the same time.The 2 features combined make sure only one consumer instance consumes from an individual stream at a time.In this configuration, super streams provide scalability and single active consumer provides the guarantee that messages of an individual stream are processed in order.

Super streams do not deprecate streams

Super streams are apartitioning solution.They are not meant to replace individual streams; they sit on top of them to handle some use cases more effectively.If the stream data is likely to be large – hundreds of gigabytes or even terabytes, size remains relative – and even presents an obvious partition key (e.g. country), a super stream can be appropriate.It can help to cope with the data size and to take advantage of data locality for some processing use cases.Remember that partitioning always comes with complexity though, even if the implementation of super streams strives to make it as transparent as possible for the application developer.

Topology

The topology of a super stream follows theAMQP 0.9.1 model: exchanges, queues, and bindings.AMQP resources are not used to transport or store stream messages.Instead, they describe the super stream topology and define which streams compose the super stream.

Let’s take the example of aninvoices super stream made of 3 streams (i.e. partitions):

  • aninvoices exchange represents the super stream

  • theinvoices-0,invoices-1,invoices-2 streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ)

  • 3 bindings between the exchange and the streams link the super stream to its partitions and representrouting rules

Diagram
Figure 4. The topology of a super stream is defined with bindings between an exchange and queues

When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules.From the application code point of view, using a super stream is mostly configuration-based.Some logic must also be provided to extract routing information from messages.

Super Stream Creation and Deletion

It is possible to manage super streams with

  • the stream Java client, by usingEnvironment#streamCreator() andEnvironment#deleteSuperStream(String)

  • theadd_super_stream anddelete_super_stream commands inrabbitmq-streams (CLI)

  • any AMQP 0.9.1 client library

  • themanagement plugin

The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings).

With the Client Library

Here is how to create aninvoices super stream with 5 partitions:

Creating a super stream by specifying the number of partitions
environment.streamCreator().name("invoices")    .superStream()    .partitions(5).creator()    .create();

The super stream partitions will beinvoices-0,invoices-1, …​,invoices-4.This topology works by hashing routing keys to determine the target partition for each message.For example, if the routing key is a customer ID, all invoices for the same customer will be routed to the same partition, ensuring they are processed in publishing order.

It is also possible to specify binding keys when creating a super stream:

Creating a super stream by specifying the binding keys
environment.streamCreator().name("invoices")    .superStream()    .bindingKeys("amer","emea","apac").creator()    .create();

The super stream partitions will beinvoices-amer,invoices-emea andinvoices-apac in this case.

Using one type of topology or the other depends on the use cases, especially how messages are processed.See the next sections on publishing and consuming to find out more.

With the CLI

Here is how to create aninvoices super stream with 5 partitions:

Creating a super stream from the CLI
rabbitmq-streams add_super_stream invoices --partitions 5

Userabbitmq-streams add_super_stream --help to learn more about the command.

Publishing to a Super Stream

When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:

Creating a Producer for a Super Stream
Producer producer = environment.producerBuilder()        .superStream("invoices")(1)        .routing(message -> message.getProperties().getMessageIdAsString())(2)        .producerBuilder()        .build();(3)// ...producer.close();(4)
1Set the super stream name
2Provide the logic to get the routing key from a message
3Create the producer instance
4Close the producer when it’s no longer necessary

Although theinvoices super stream is not a physical stream, you must use its name when declaring the producer.The client automatically discovers the individual streams that compose the super stream.Your application code must provide logic to extract a routing key from each message using aFunction<Message, String>.The client hashes this routing key to determine the target stream using the partition list and a modulo operation.

The client uses 32-bitMurmurHash3 by default to hash the routing key.This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:

Specifying a custom hash function
Producer producer = environment.producerBuilder()    .superStream("invoices")    .routing(message -> message.getProperties().getMessageIdAsString())    .hash(rk -> rk.hashCode())(1)    .producerBuilder()    .build();
1UseString#hashCode() to hash the routing key

Note using Java’shashCode() method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.

Resolving Routes with Bindings

Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams.The stream Java client provides another way to resolve streams, based on the routing keyand the bindings between the super stream exchange and the streams.

This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below:

Diagram
Figure 5. A super stream with a partition for a region in a world

In such a case, the routing key will be a property of the message that represents the region:

Enabling the "key" routing strategy
Producer producer = environment.producerBuilder()    .superStream("invoices")    .routing(msg -> msg.getApplicationProperties().get("region").toString())(1)    .key()(2)    .producerBuilder()    .build();
1Extract the routing key
2Enable the "key" routing strategy

Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams.Note the client caches results, it does not query the broker for every message.

Using a Custom Routing Strategy

The solution that provides the most control over routing is using a custom routing strategy.This should be needed only for specific cases.

Here is an excerpt of theRoutingStrategy interface:

The routing strategy interface
publicinterfaceRoutingStrategy {/** Where to route a message. */List<String> route(Message message, Metadata metadata);/** Metadata on the super stream. */interfaceMetadata {List<String> partitions();List<String> route(String routingKey);  }}

Note it is possible to route a message to several streams or even nowhere.The "hash" routing strategy always routes to 1 stream and the "key" routing strategy can route to several streams.

The following code sample shows how to implement a simplistic round-robinRoutingStrategy and use it in the producer.Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity’s sake.

Setting a round-robin routing strategy
AtomicLong messageCount =newAtomicLong(0);RoutingStrategy routingStrategy = (message, metadata) -> {List<String> partitions = metadata.partitions();String stream = partitions.get(        (int) messageCount.getAndIncrement() % partitions.size()    );returnCollections.singletonList(stream);};Producer producer = environment.producerBuilder()    .superStream("invoices")    .routing(null)(1)    .strategy(routingStrategy)(2)    .producerBuilder()    .build();
1No need to set the routing key extraction logic
2Set the custom routing strategy
Deduplication

Deduplication for a super stream producer works the same way as with asingle stream producer.The publishing ID values are spread across the streams, but this does not affect the mechanism.

Consuming From a Super Stream

A super stream consumer is a composite consumer: it looks up the super stream partitions and creates a consumer for each of them.The programming model is the same as with regular consumers for the application developer: their main job is to provide the application code to process messages, that is aMessageHandler instance.The configuration is different though and this section covers its subtleties.But let’s focus on the behavior of a super stream consumer first.

Super Stream Consumer in Practice

Imagine you have a super stream made of 3 partitions (individual streams).You start an instance of your application, that itself creates a super stream consumer for this super stream.The super stream consumer will create 3 consumers internally, one for each partition, and messages will flow in yourMessageHandler.

Imagine now that you start another instance of your application.It will do the exact same thing as previously and the 2 instances will process the exact same messages in parallel.This may be not what you want: the messages will be processed twice!

Having one instance of your application may be enough: the data are spread across several streams automatically and the messages from the different partitions are processed in parallel from a single OS process.

But if you want to scale the processing across several OS processes (or bare-metal machines, or virtual machines) and you don’t want your messages to be processed several times as illustrated above, you’ll have to enable thesingle active consumer feature on your super stream consumer.

The next subsections cover the basic settings of a super stream consumer and adedicated section covers how super stream consumers and single active consumer play together.

Declaring a Super Stream Consumer

Declaring a super stream consumer is not much different from declaring a single stream consumer.TheConsumerBuilder#superStream(String) must be used to set the super stream to consume from:

Declaring a super stream consumer
Consumer consumer = environment.consumerBuilder()    .superStream("invoices")(1)    .messageHandler((context, message) -> {// message processing    })    .build();// ...consumer.close();(2)
1Set the super stream name
2Close the consumer when it is no longer necessary

That’s all.The super stream consumer will take care of the details (partition lookup, coordination of individual consumers, etc.).

Offset Tracking

The semantics of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer.There are still some subtle differences, so a good understanding ofoffset tracking in general and of theautomatic andmanual offset tracking strategies is recommended.

Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:

  • automatic offset tracking: internally,the client divides themessageCountBeforeStorage setting by the number of partitions for each individual consumer.Consider a 3-partition super stream withmessageCountBeforeStorage set to 10,000. If 10,000 messages arrive evenly distributed (approximately 3,333 per partition), automatic offset tracking will not trigger because no individual partition reaches the threshold.DividingmessageCountBeforeStorage by the partition count provides more accurate tracking when messages are evenly distributed across partitions.A good rule of thumb is to then multiply the expected per-streammessageCountBeforeStorage by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream.

  • manual offset tracking: theMessageHandler.Context#storeOffset() method must be used, theConsumer#store(long) will fail, because an offset value has a meaning only in one stream, not in other streams.A call toMessageHandler.Context#storeOffset() will store the current message offset inits stream, but also the offset of the last dispatched message for the other streams of the super stream.

Single Active Consumer Support
Single Active Consumer requiresRabbitMQ 3.11 or more.

Asstated previously, super stream consumers and single active consumer provide scalability and the guarantee that messages of an individual stream are processed in order.

Let’s take an example with a 3-partition super stream:

  • You have an application that creates a super stream consumer instance with single active consumer enabled.

  • You start 3 instances of this application. Each instance is a JVM process running in a Docker container, virtual machine, or on bare-metal hardware.

  • Since the super stream has 3 partitions, each application instance creates a super stream consumer that maintains 3 internal consumer instances.This results in 9 consumer instances total.Such a super stream consumer is acomposite consumer.

  • The broker and the different application instances coordinate so that only 1 consumer instance for a given partition receives messages at a time.So among these 9 consumer instances, only 3 are actuallyactive, the other ones are idle orinactive.

  • If one of the application instances stops, the broker willrebalance its active consumer to one of the other instances.

The following figure illustrates how the client library supports the combination of the super stream and single active consumer features.It uses a composite consumer that creates an individual consumer for each partition of the super stream.If there is only one single active consumer instance with a given name for a super stream, each individual consumer is active.

Diagram
Figure 6. A single active consumer on a super stream is a composite consumer that creates an individual consumer for each partition

Imagine now we start 3 instances of the consuming application to scale out the processing.The individual consumer instances spread out across the super stream partitions and only one is active for each partition, as illustrated in the following figure:

Diagram
Figure 7. Consumer instances spread across the super stream partitions and are activated accordingly

After this overview, let’s see the API and the configuration details.

The following snippet shows how to declare a single active consumer on a super stream with theConsumerBuilder#superStream(String) andConsumerBuilder#singleActiveConsumer() methods:

Enabling single active consumer on a super stream
Consumer consumer = environment.consumerBuilder()    .superStream("invoices")(1)    .name("application-1")(2)    .singleActiveConsumer()(3)    .messageHandler((context, message) -> {// message processing    })    .build();// ...
1Set the super stream name
2Set the consumer name (mandatory to enable single active consumer)
3Enable single active consumer

Note it is mandatory to specify a name for the consumer.This name will be used to identify thegroup of consumer instances and make sure only one is active for each partition.The name is also the reference for offset tracking.

The example above uses by defaultautomatic offset tracking.With this strategy, the client library takes care of offset tracking when consumers become active or inactive.It looks up the latest stored offset when a consumer becomes active to start consuming at the appropriate offset and it stores the last dispatched offset when a consumer becomes inactive.

The story is not the same withmanual offset tracking as the client library does not know which offset it should store when a consumer becomes inactive.The application developer can use theConsumerUpdateListener) callback to react appropriately when a consumer changes state.The following snippet illustrates the use of theConsumerUpdateListener callback:

Using manual offset tracking for a super stream single active consumer
Consumer consumer =    environment.consumerBuilder()        .superStream("invoices")(1)        .name("application-1")(2)        .singleActiveConsumer()(3)        .manualTrackingStrategy()(4)        .builder()        .consumerUpdateListener(context -> {(5)if(context.isActive()) {(6)try {return OffsetSpecification.offset(                       context.consumer().storedOffset() +1                   );               }catch (NoOffsetException e) {return OffsetSpecification.next();               }           }else {               context.consumer().store(lastProcessedOffsetForThisStream);(7)returnnull;           }        })        .messageHandler((context, message) -> {// message handling code...if (conditionToStore()) {                context.storeOffset();(8)            }        })        .build();// ...
1Set the super stream name
2Set the consumer name (mandatory to enable single active consumer)
3Enable single active consumer
4Enable manual offset tracking strategy
5SetConsumerUpdateListener
6Return stored offset + 1 or default when consumer becomes active
7Store last processed offset for the stream when consumer becomes inactive
8Store the current offset on some condition

TheConsumerUpdateListener callback must return the offset to start consuming from when a consumer becomes active.This is what the code above does: it checks if the consumer is active withConsumerUpdateListener.Context#isActive() and looks up the last stored offset.If there is no stored offset yet, it returns a default value,OffsetSpecification#next() here.

When a consumer becomes inactive, it should store the last processed offset, as another consumer instance will take over elsewhere.It is expected this other consumer runs the exact same code, so it will execute the same sequence when it becomes active (looking up the stored offset, returning the value + 1).

Note theConsumerUpdateListener is called for apartition, that is an individual stream.The application code should take care of maintaining a reference of the last processed offset for each partition of the super stream, e.g. with aMap<String, Long> (partition-to-offset map).To do so, thecontext parameter of theMessageHandler andConsumerUpdateListener callbacks provide astream() method.

RabbitMQ Stream provides server-side offset tracking, but it is possible to use an external store to track offsets for streams.TheConsumerUpdateListener callback is still your friend in this case.The following snippet shows how to leverage it when an external store is in use:

Using external offset tracking for a super stream single active consumer
Consumer consumer = environment.consumerBuilder()    .superStream("invoices")(1)    .name("application-1")(2)    .singleActiveConsumer()(3)    .noTrackingStrategy()(4)    .consumerUpdateListener(context -> {(5)if (context.isActive()) {(6)long offset = getOffsetFromExternalStore();return OffsetSpecification.offset(offset +1);      }returnnull;(7)    })    .messageHandler((context, message) -> {// message handling code...      storeOffsetInExternalStore(context.stream(), context.offset());(8)    })    .build();
1Set the super stream name
2Set the consumer name (mandatory to enable single active consumer)
3Enable single active consumer
4Disable server-side offset tracking
5SetConsumerUpdateListener
6Use external store for stored offset when consumer becomes active
7Assume offset already stored when consumer becomes inactive
8Use external store for offset tracking

Here are the takeaway points of this code:

  • Even though there is no server-side offset tracking to use it, the consumer must still have a name to identify the group it belongs to.The external offset tracking mechanism is free to use the same name or not.

  • CallingConsumerBuilder#noTrackingStrategy() is necessary to disable server-side offset tracking, or the automatic tracking strategy will kick in.

  • The snippet does not provide the details, but the offset tracking mechanism seems to store the offset for each message.The external store must be able to cope with the message rate in a real-world scenario.

  • TheConsumerUpdateListener callback returns the last stored offset + 1 when the consumer becomes active.This way the broker will resume the dispatching at this location in the stream.

  • A well-behavedConsumerUpdateListener must make sure the last processed offset is stored when the consumer becomes inactive, so that the consumer that will take over can look up the offset and resume consuming at the right location.OurConsumerUpdateListener does not do anything when the consumer becomes inactive (it returnsnull): it can afford this because the offset is stored for each message.Make sure to store the last processed offset when the consumer becomes inactive to avoid duplicates when the consumption resumes elsewhere.

Advanced Topics

Filtering

Filtering requiresRabbitMQ 3.13 or more.

RabbitMQ Stream’s server-side filtering saves network bandwidth by filtering messages on the server, so clients receive only a subset of the messages in a stream.

The filtering feature works as follows:

  • each message is published with an associatedfilter value

  • a consumer that wants to enable filtering must:

    • define one or several filter values

    • define some client-side filtering logic

Why is client-side filtering logic still needed?Server-side filtering is probabilistic — it may still send messages that don’t match your filter values.The server uses aBloom filter (a space-efficient probabilistic data structure) where false positives are possible.Despite this limitation, filtering significantly reduces network bandwidth.

Filtering on the Publishing Side

Publishers must define logic to extract filter values from messages.The following snippet shows how to extract the filter value from an application property:

Declaring a producer with logic to extract a filter value from each message
Producer producer = environment.producerBuilder()  .stream("invoices")  .filterValue(msg ->    msg.getApplicationProperties().get("state").toString())(1)  .build();
1Extract filter value fromstate application property

Filter values can be null, resulting inunfiltered messages that are published normally.

Filtering on the Consuming Side

A consumer needs to set up one or several filter values and some filtering logic to enable filtering.The filtering logic must be consistent with the filter values.In the next snippet, the consumer wants to process only messages from the state of California.It sets a filter value tocalifornia and a predicate that accepts a message only if thestate application property iscalifornia:

Declaring a consumer with a filter value and filtering logic
String filterValue ="california";Consumer consumer = environment.consumerBuilder()  .stream("invoices")  .filter()    .values(filterValue)(1)    .postFilter(msg ->      filterValue.equals(msg.getApplicationProperties().get("state")))(2)  .builder()  .messageHandler((ctx, msg) -> { })  .build();
1Set filter value
2Set filtering logic

The filter logic is aPredicate<Message>.It must returntrue if a message is accepted, following the same semantics asjava.util.stream.Stream#filter(Predicate).

As stated above, not all messages must have an associated filter value.Many applications may not need filtering, so they can publish messages the regular way.So a stream can contain messages with and without an associated filter value.

By default, messages without a filter value (a.k.aunfiltered messages) are not sent to a consumer that enabled filtering.

But what if a consumer wants to process messages with a filter value and messages without any filter value as well?It must use thematchUnfiltered() method in its declaration and also make sure to keep the filtering logic consistent:

Getting unfiltered messages as well when enabling filtering
String filterValue ="california";Consumer consumer = environment.consumerBuilder()  .stream("invoices")  .filter()    .values(filterValue)(1)    .matchUnfiltered()(2)    .postFilter(msg ->        filterValue.equals(msg.getApplicationProperties().get("state"))        || !msg.getApplicationProperties().containsKey("state")(3)    )  .builder()  .messageHandler((ctx, msg) -> { })  .build();
1Request messages from California
2Request messages without a filter value as well
3Let both types of messages pass

In the example above, the filtering logic allows bothcalifornia messagesand messages without a state set as well.

Considerations on Filtering

Since the server may send non-matching messages due to the probabilistic nature of Bloom filters, the client-side filtering logic must be robust to avoid processing unwanted messages.

Good filter value candidates:

  • Shared categorical values: geographical locations (countries, states), document types (payslip, invoice, order), product categories (book, luggage, toy)

  • Values with reasonable cardinality (few to few thousand distinct values)

Poor filter value candidates:

  • Unique identifiers (message IDs, timestamps, UUIDs)

  • Values with extreme cardinality (tens of thousands of distinct values)

OAuth 2 Support

The client supports OAuth 2 authentication using theOAuth 2 Client Credentials flow.Both the client and RabbitMQ server must be configured to use the same OAuth 2 server.

Prerequisites:

Token retrieval is configured at the environment level:

Configuring OAuth 2 token retrieval
Environment env = Environment.builder()    .oauth2()(1)    .tokenEndpointUri("https://localhost:8443/uaa/oauth/token/")(2)    .clientId("rabbitmq").clientSecret("rabbitmq")(3)    .grantType("password")(4)    .parameter("username","rabbit_super")(5)    .parameter("password","rabbit_super")(5)    .sslContext(sslContext)(6)    .environmentBuilder()    .build();
1Access the OAuth 2 configuration
2Set the token endpoint URI
3Authenticate the client application
4Use Client Credentials grant type for service-to-service authentication
5Set optional parameters (depends on the OAuth 2 server)
6Set the SSL context (e.g. to verify and trust the identity of the OAuth 2 server)

The environment handles token management automatically:

  • Retrieves tokens for stream connections

  • Refreshes tokens before expiration

  • Re-authenticates existing connections to prevent broker disconnections

  • Uses the same token for all maintained connections

Using Nativeepoll

The stream Java client usesNetty's Java NIO transport by default, which works well for most applications.

For specialized performance requirements, Netty supportsJNI-based transports.These are less portable but may offer better performance for specific workloads.Note: The RabbitMQ team has not observed significant improvements in their testing.

This example shows how to configure the popularLinuxepoll transport.Other JNI transports follow the same configuration pattern.

Add the native transport dependency matching your OS and architecture.This example uses Linux x86-64 with thelinux-x86_64 classifier.Here is the declaration for Maven:

Declaring the Linux x86-64 nativeepoll transport dependency with Maven
<dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-transport-native-epoll</artifactId><version>4.2.7.Final</version><classifier>linux-x86_64</classifier></dependency></dependencies>

And for Gradle:

Declaring the Linux x86-64 nativeepoll transport dependency with Gradle
dependencies {  compile"io.netty:netty-transport-native-epoll:4.2.7.Final:linux-x86_64"}

The nativeepoll transport is set up when the environment is configured:

Configuring the nativeepoll transport in the environment
EventLoopGroup epollEventLoopGroup =new MultiThreadIoEventLoopGroup((1)    EpollIoHandler.newFactory()(1));(1)Environment environment = Environment.builder()    .netty()(2)        .eventLoopGroup(epollEventLoopGroup)(3)        .bootstrapCustomizer(b -> b.channel(EpollSocketChannel.class))(4)        .environmentBuilder()    .build();
1Create theepoll event loop group (don’t forget to close it!)
2Use the Netty configuration helper
3Set the event loop group
4Set the channel class to use

Note the event loop group must be closed explicitly: the environment will not close it itself as it is provided externally.

Building the Client

You need JDK 11 or more installed.

To build the JAR file:

./mvnw clean package -DskipITs -DskipTests

To launch the test suite (requires a local RabbitMQ node with stream plugin enabled):

./mvnw verify -Drabbitmqctl.bin=/path/to/rabbitmqctl

Appendix A: Micrometer Observation

It is possible to useMicrometer Observation to instrument publishing and consuming in the stream Java client.Micrometer Observation providesmetrics, tracing, and log correlation with one single API.

The stream Java client provides anObservationCollector abstraction and an implementation for Micrometer Observation.The following snippet shows how to create and set up the MicrometerObservationCollector implementation with an existingObservationRegistry:

Configuring Micrometer Observation
Environment environment = Environment.builder()    .observationCollector(new MicrometerObservationCollectorBuilder()(1)        .registry(observationRegistry).build())(2)    .build();
1Configure MicrometerObservationCollector with builder
2Set MicrometerObservationRegistry

The next sections document the conventions, spans, and metrics made available by the instrumentation.They are automatically generated from the source code with theMicrometer documentation generator.

Observability - Conventions

Below you can find a list of allGlobalObservationConvention andObservationConvention declared by this project.

Table 1. ObservationConvention implementations

ObservationConvention Class Name

Applicable ObservationContext Class Name

com.rabbitmq.stream.observation.micrometer.DefaultProcessObservationConvention

ProcessContext

com.rabbitmq.stream.observation.micrometer.ProcessObservationConvention

ProcessContext

com.rabbitmq.stream.observation.micrometer.DefaultPublishObservationConvention

PublishContext

com.rabbitmq.stream.observation.micrometer.PublishObservationConvention

PublishContext

Observability - Spans

Below you can find a list of all spans declared by this project.

Process Observation Span

Observation for processing a message.

Span namerabbitmq.stream.process (defined by convention classcom.rabbitmq.stream.observation.micrometer.DefaultProcessObservationConvention).

Fully qualified name of the enclosing classcom.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.

Table 2. Tag Keys

Name

Description

messaging.operation(required)

A string identifying the kind of messaging operation.

messaging.system(required)

A string identifying the messaging system.

net.protocol.name(required)

A string identifying the protocol (RabbitMQ Stream).

net.protocol.version(required)

A string identifying the protocol version (1.0).

Publish Observation Span

Observation for publishing a message.

Span namerabbitmq.stream.publish (defined by convention classcom.rabbitmq.stream.observation.micrometer.DefaultPublishObservationConvention).

Fully qualified name of the enclosing classcom.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.

Table 3. Tag Keys

Name

Description

messaging.operation(required)

A string identifying the kind of messaging operation.

messaging.system(required)

A string identifying the messaging system.

net.protocol.name(required)

A string identifying the protocol (RabbitMQ Stream).

net.protocol.version(required)

A string identifying the protocol version (1.0).

Observability - Metrics

Below you can find a list of all metrics declared by this project.

Process Observation

Observation for processing a message.

Metric namerabbitmq.stream.process (defined by convention classcom.rabbitmq.stream.observation.micrometer.DefaultProcessObservationConvention).Typetimer.

Metric namerabbitmq.stream.process.active (defined by convention classcom.rabbitmq.stream.observation.micrometer.DefaultProcessObservationConvention).Typelong task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally usesnanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing classcom.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.

Table 4. Low cardinality Keys

Name

Description

messaging.operation(required)

A string identifying the kind of messaging operation.

messaging.system(required)

A string identifying the messaging system.

net.protocol.name(required)

A string identifying the protocol (RabbitMQ Stream).

net.protocol.version(required)

A string identifying the protocol version (1.0).

Publish Observation

Observation for publishing a message.

Metric namerabbitmq.stream.publish (defined by convention classcom.rabbitmq.stream.observation.micrometer.DefaultPublishObservationConvention).Typetimer.

Metric namerabbitmq.stream.publish.active (defined by convention classcom.rabbitmq.stream.observation.micrometer.DefaultPublishObservationConvention).Typelong task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally usesnanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing classcom.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.

Table 5. Low cardinality Keys

Name

Description

messaging.operation(required)

A string identifying the kind of messaging operation.

messaging.system(required)

A string identifying the messaging system.

net.protocol.name(required)

A string identifying the protocol (RabbitMQ Stream).

net.protocol.version(required)

A string identifying the protocol version (1.0).

Version 1.3.0
Last updated 2025-10-16 07:16:04 UTC

[8]ページ先頭

©2009-2025 Movatter.jp