- Notifications
You must be signed in to change notification settings - Fork586
AddConfirmationChannel for async publisher confirmations#1824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
AddConfirmationChannel for async publisher confirmations#1824
Conversation
lukebakken commentedDec 8, 2025
Hey@acogoluegnes! Here's a proposal for implementing "automatic" publisher confirmation tracking in a similar manner to the .NET client. As you're the expert here, please suggest better names, a better implementation, etc, as I'm just barely familiar enough with Java and this project to implement this feature with the help of a genie. I did, of course, review the code. If you like the way this is going, I thought I'd also add rate-throttling in a similar manner to the .NET client, i.e. as the outstanding confirmation window closes, increase delay between publishes to (hopefully) allow the broker to catch up. |
6a31a46 to714370eCompareUh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
acogoluegnes commentedDec 9, 2025
Thanks for this contributon@lukebakken. I think it is on the right track. I added some comments in the code. Just a few more remarks:
|
Uh oh!
There was an error while loading.Please reload this page.
714370e to44c3acaComparec64f7ba to5520ef4Comparemichaelklishin commentedDec 10, 2025
@lukebakken I have sent you an external contributor invite for this repo. |
5520ef4 todd2fb5eCompareConfirmationChannel for async publisher confirmationslukebakken commentedDec 11, 2025
@michaelklishin@acogoluegnes I tookthis comment to heart and re-implemented this feature as a |
041b932 tob0761bfCompareThe new `ConfirmationChannel` API introduced inrabbitmq/rabbitmq-java-client#1824 provides asynchronous publisherconfirmation tracking with a `CompletableFuture`-based API, ratelimiting, and message correlation support.This change adds `PublisherConfirmsAsync.java` to demonstrate the`ConfirmationChannel` API. The tutorial shows how to create a`ConfirmationChannel` wrapper with rate limiting, publish messagesasynchronously with correlation context, and wait for all confirmationsusing `CompletableFuture.allOf()`.Depends onrabbitmq/rabbitmq-java-client#1824
b0761bf to49a0ff5Comparec002375 to8112046Comparelukebakken commentedDec 11, 2025
Alrighty gang, all set I think. Small tutorial here -rabbitmq/rabbitmq-tutorials#707 |
Traditional publisher confirms in the Java client require manualtracking of sequence numbers and correlation of Basic.Return messages.This makes per-message error handling complex and provides no built-inasync pattern, backpressure mechanism, or message correlation support.This change introduces `ConfirmationChannel`, a wrapper that providesautomatic publisher confirmation tracking with a`CompletableFuture`-based API, optional throttling, and generic contextparameter for message correlation. The implementation useslistener-based integration with existing `Channel` instances, requiringno modifications to `ChannelN`.New API components:- `ConfirmationChannel` interface - Extends `Channel` and adds `basicPublishAsync()` methods that return `CompletableFuture<T>`- `ConfirmationChannelN` implementation - Wraps any `Channel` instance and tracks confirmations via return/confirm/shutdown listeners- `PublishException` - Exception thrown when message is nack'd or returned, with sequence number, routing details, and user contextThe wrapper maintains independent sequence numbers using `AtomicLong`and stores confirmation state in a `ConcurrentHashMap`. Each entry holdsthe future, rate limiter permit, and user-provided context. Messagesinclude an `x-seq-no` header for correlating Basic.Return responses.Rate limiting is optional via `RateLimiter` parameter. The`ThrottlingRateLimiter` implementation uses progressive delays(0-1000ms) based on capacity usage, applying backpressure when availablepermits fall below a threshold (default 50%).The `basicPublish()` and `waitForConfirms()` methods throw`UnsupportedOperationException` on `ConfirmationChannel` to preventmixing synchronous and asynchronous patterns. All other `Channel`methods delegate to the wrapped instance.Tests include 9 unit tests for `ThrottlingRateLimiter` and 24integration tests for publisher confirmation tracking with contextparameter verification, rate limiting scenarios, and error handling.
8112046 to9a31accComparemichaelklishin commentedDec 11, 2025
@lukebakken since this is a feature by most definitions, we now would have to go through a special approval process on our end :( |
The new `ConfirmationChannel` API introduced inrabbitmq/rabbitmq-java-client#1824 provides asynchronous publisherconfirmation tracking with a `CompletableFuture`-based API, ratelimiting, and message correlation support.This change adds `PublisherConfirmsAsync.java` to demonstrate the`ConfirmationChannel` API. The tutorial shows how to create a`ConfirmationChannel` wrapper with rate limiting, publish messagesasynchronously with correlation context, and wait for all confirmationsusing `CompletableFuture.allOf()`.Depends onrabbitmq/rabbitmq-java-client#1824
Uh oh!
There was an error while loading.Please reload this page.
Traditional publisher confirms in the Java client require manual
tracking of sequence numbers and correlation of Basic.Return messages.
This makes per-message error handling complex and provides no built-in
async pattern, backpressure mechanism, or message correlation support.
This change introduces
ConfirmationChannel, a wrapper that providesautomatic publisher confirmation tracking with a
CompletableFuture-based API, optional throttling, and generic contextparameter for message correlation. The implementation uses
listener-based integration with existing
Channelinstances, requiringno modifications to
ChannelN.New API components:
ConfirmationChannelinterface - ExtendsChanneland addsbasicPublishAsync()methods that returnCompletableFuture<T>ConfirmationChannelNimplementation - Wraps anyChannelinstanceand tracks confirmations via return/confirm/shutdown listeners
PublishException- Exception thrown when message is nack'd orreturned, with sequence number, routing details, and user context
The wrapper maintains independent sequence numbers using
AtomicLongand stores confirmation state in a
ConcurrentHashMap. Each entry holdsthe future, rate limiter permit, and user-provided context. Messages
include an
x-seq-noheader for correlating Basic.Return responses.Rate limiting is optional via
RateLimiterparameter. TheThrottlingRateLimiterimplementation uses progressive delays(0-1000ms) based on capacity usage, applying backpressure when available
permits fall below a threshold (default 50%).
The
basicPublish()andwaitForConfirms()methods throwUnsupportedOperationExceptiononConfirmationChannelto preventmixing synchronous and asynchronous patterns. All other
Channelmethods delegate to the wrapped instance.