Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

AddConfirmationChannel for async publisher confirmations#1824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
lukebakken wants to merge1 commit intorabbitmq:main
base:main
Choose a base branch
Loading
fromlukebakken:lukebakken/publisher-confirm-tracking

Conversation

@lukebakken
Copy link
Collaborator

@lukebakkenlukebakken commentedDec 8, 2025
edited
Loading

Traditional publisher confirms in the Java client require manual
tracking of sequence numbers and correlation of Basic.Return messages.
This makes per-message error handling complex and provides no built-in
async pattern, backpressure mechanism, or message correlation support.

This change introducesConfirmationChannel, a wrapper that provides
automatic publisher confirmation tracking with a
CompletableFuture-based API, optional throttling, and generic context
parameter for message correlation. The implementation uses
listener-based integration with existingChannel instances, requiring
no modifications toChannelN.

New API components:

  • ConfirmationChannel interface - ExtendsChannel and adds
    basicPublishAsync() methods that returnCompletableFuture<T>
  • ConfirmationChannelN implementation - Wraps anyChannel instance
    and tracks confirmations via return/confirm/shutdown listeners
  • PublishException - Exception thrown when message is nack'd or
    returned, with sequence number, routing details, and user context

The wrapper maintains independent sequence numbers usingAtomicLong
and stores confirmation state in aConcurrentHashMap. Each entry holds
the future, rate limiter permit, and user-provided context. Messages
include anx-seq-no header for correlating Basic.Return responses.

Rate limiting is optional viaRateLimiter parameter. The
ThrottlingRateLimiter implementation uses progressive delays
(0-1000ms) based on capacity usage, applying backpressure when available
permits fall below a threshold (default 50%).

ThebasicPublish() andwaitForConfirms() methods throw
UnsupportedOperationException onConfirmationChannel to prevent
mixing synchronous and asynchronous patterns. All otherChannel
methods delegate to the wrapped instance.

@lukebakken
Copy link
CollaboratorAuthor

Hey@acogoluegnes!

Here's a proposal for implementing "automatic" publisher confirmation tracking in a similar manner to the .NET client. As you're the expert here, please suggest better names, a better implementation, etc, as I'm just barely familiar enough with Java and this project to implement this feature with the help of a genie. I did, of course, review the code.

If you like the way this is going, I thought I'd also add rate-throttling in a similar manner to the .NET client, i.e. as the outstanding confirmation window closes, increase delay between publishes to (hopefully) allow the broker to catch up.

@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch 2 times, most recently from6a31a46 to714370eCompareDecember 8, 2025 23:53
@acogoluegnes
Copy link
Contributor

Thanks for this contributon@lukebakken. I think it is on the right track. I added some comments in the code. Just a few more remarks:

  • the new methods in the public interfaces should have a default implementation if we back port the PR to 5.x. This is for backward compatibility. Throwing an exception is good enough.
  • synchronized blocks and Object-based synchronization is considered old-style Java concurrency. We could see if there are more modern utilities in the Java concurrency toolkit to handle what we need inChannelN. I'm not super opinionated on this topic though and this is an implementation details we can polish later.
  • the bulk of the PR is inChannelN, it is not rocket science but add some non-trivial logic to a somewhat already complex class. I was wondering if it could be possible to externalize all the logic in aPublishConfirmState class used fromChannelN. This state class would register regular return and confirm listeners on the channel. This could even be an interface with a no-op implementation when the feature is not activated, minimalizing the impact onChannelN. We can discuss this design when all the features are added.
lukebakken reacted with thumbs up emoji

@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch from714370e to44c3acaCompareDecember 9, 2025 16:36
@lukebakkenlukebakken changed the titleAdd automatic publisher confirmation tracking with async APIAdd automatic publisher confirmation tracking with throttlingDec 9, 2025
@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch 6 times, most recently fromc64f7ba to5520ef4CompareDecember 10, 2025 21:15
@michaelklishin
Copy link
Contributor

@lukebakken I have sent you an external contributor invite for this repo.

lukebakken reacted with hooray emojilukebakken reacted with rocket emoji

@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch from5520ef4 todd2fb5eCompareDecember 11, 2025 04:02
@lukebakkenlukebakken changed the titleAdd automatic publisher confirmation tracking with throttlingAddConfirmationChannel for async publisher confirmationsDec 11, 2025
@lukebakken
Copy link
CollaboratorAuthor

@michaelklishin@acogoluegnes I tookthis comment to heart and re-implemented this feature as aConfirmationChannel class that wrapsChannelN and does its own confirmation tracking. That way, no modifications to the existingChannel interface orChannelN class are necessary.

acogoluegnes reacted with thumbs up emoji

@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch 2 times, most recently from041b932 tob0761bfCompareDecember 11, 2025 04:15
lukebakken added a commit to rabbitmq/rabbitmq-tutorials that referenced this pull requestDec 11, 2025
The new `ConfirmationChannel` API introduced inrabbitmq/rabbitmq-java-client#1824 provides asynchronous publisherconfirmation tracking with a `CompletableFuture`-based API, ratelimiting, and message correlation support.This change adds `PublisherConfirmsAsync.java` to demonstrate the`ConfirmationChannel` API. The tutorial shows how to create a`ConfirmationChannel` wrapper with rate limiting, publish messagesasynchronously with correlation context, and wait for all confirmationsusing `CompletableFuture.allOf()`.Depends onrabbitmq/rabbitmq-java-client#1824
@lukebakkenlukebakken marked this pull request as ready for reviewDecember 11, 2025 17:45
@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch fromb0761bf to49a0ff5CompareDecember 11, 2025 17:52
@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch 2 times, most recently fromc002375 to8112046CompareDecember 11, 2025 17:59
@lukebakken
Copy link
CollaboratorAuthor

Alrighty gang, all set I think. Small tutorial here -rabbitmq/rabbitmq-tutorials#707

Traditional publisher confirms in the Java client require manualtracking of sequence numbers and correlation of Basic.Return messages.This makes per-message error handling complex and provides no built-inasync pattern, backpressure mechanism, or message correlation support.This change introduces `ConfirmationChannel`, a wrapper that providesautomatic publisher confirmation tracking with a`CompletableFuture`-based API, optional throttling, and generic contextparameter for message correlation. The implementation useslistener-based integration with existing `Channel` instances, requiringno modifications to `ChannelN`.New API components:- `ConfirmationChannel` interface - Extends `Channel` and adds  `basicPublishAsync()` methods that return `CompletableFuture<T>`- `ConfirmationChannelN` implementation - Wraps any `Channel` instance  and tracks confirmations via return/confirm/shutdown listeners- `PublishException` - Exception thrown when message is nack'd or  returned, with sequence number, routing details, and user contextThe wrapper maintains independent sequence numbers using `AtomicLong`and stores confirmation state in a `ConcurrentHashMap`. Each entry holdsthe future, rate limiter permit, and user-provided context. Messagesinclude an `x-seq-no` header for correlating Basic.Return responses.Rate limiting is optional via `RateLimiter` parameter. The`ThrottlingRateLimiter` implementation uses progressive delays(0-1000ms) based on capacity usage, applying backpressure when availablepermits fall below a threshold (default 50%).The `basicPublish()` and `waitForConfirms()` methods throw`UnsupportedOperationException` on `ConfirmationChannel` to preventmixing synchronous and asynchronous patterns. All other `Channel`methods delegate to the wrapped instance.Tests include 9 unit tests for `ThrottlingRateLimiter` and 24integration tests for publisher confirmation tracking with contextparameter verification, rate limiting scenarios, and error handling.
@lukebakkenlukebakkenforce-pushed thelukebakken/publisher-confirm-tracking branch from8112046 to9a31accCompareDecember 11, 2025 20:35
@michaelklishin
Copy link
Contributor

@lukebakken since this is a feature by most definitions, we now would have to go through a special approval process on our end :(

lukebakken reacted with thumbs up emojilukebakken reacted with eyes emoji

lukebakken added a commit to rabbitmq/rabbitmq-tutorials that referenced this pull requestDec 18, 2025
The new `ConfirmationChannel` API introduced inrabbitmq/rabbitmq-java-client#1824 provides asynchronous publisherconfirmation tracking with a `CompletableFuture`-based API, ratelimiting, and message correlation support.This change adds `PublisherConfirmsAsync.java` to demonstrate the`ConfirmationChannel` API. The tutorial shows how to create a`ConfirmationChannel` wrapper with rate limiting, publish messagesasynchronously with correlation context, and wait for all confirmationsusing `CompletableFuture.allOf()`.Depends onrabbitmq/rabbitmq-java-client#1824
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@michaelklishinmichaelklishinAwaiting requested review from michaelklishin

@acogoluegnesacogoluegnesAwaiting requested review from acogoluegnes

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

3 participants

@lukebakken@acogoluegnes@michaelklishin

[8]ページ先頭

©2009-2025 Movatter.jp