Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork1.7k
(2.14) Reset consumer to new starting sequence#7489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Uh oh!
There was an error while loading.Please reload this page.
Conversation
derekcollison commentedOct 29, 2025
I have not review the code, but if we put the internal ordered consumer with ack_all, and pick a window such that when the ack_all is triggered we can release all messages below it that have been acked through the normal process should suffice. Unclear to me why we need to introduce this new concept. |
MauriceVanVeen commentedOct 29, 2025
There's no concept of ordered consumption outside of using an "ordered consumer" that checks for the consumer's delivered sequence, and detects gaps. Enabling reliable ordered sourcing, while using a durable without recreating, requires the ability to reset the starting sequence such that the consumer's delivery sequence resets to 1 and gaps can be detected based on that. |
neilalexander commentedOct 29, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
On the flip side, having the ability to reset consumers is very nice for other reasons, i.e. for a mediator pattern where you have an application that would run a worker+ephemeral consumer for each newly discovered subject and a supervising headers-only consumer to determine which workers to start. With this, you can restart your application, reset the supervising consumer back to 0 and whip through the stream to determine which workers to rehydrate without having to maintain any external state. For the sourcing/mirroring use-case, this enables us to keep durable consumers on the streams that are being sourced/mirrored from but to be able to put the consumer back to where we need it to be, such that we don't blow away messages accidentally by deleting & recreating ephemeral/direct sourcing consumers on an interest-based stream (which is the massive footgun with sourcing today that we'd like to solve). |
ripienaar commentedOct 29, 2025
How will this really help with WQ though? You cant have overlapping consumers, sourcing all the subjects would essentially prevent anyone else from using the WQ |
derekcollison commentedOct 29, 2025
I like the idea of solving the sourced streams reverse traversal for sure, but several ways to do that. Resetting the sequence may have profound implications on clients though that we may not be able to forsee. |
MauriceVanVeen commentedOct 29, 2025
With WorkQueue you would indeed only have a single consumer. In general I think it may make sense to only allow this durable source pattern on Interest and Limits, because WorkQueue has additional constraints:
Those may be solvable. But generally switching to an Interest based stream would be more flexible, also allowing other consumers not just the one used for sourcing. |
MauriceVanVeen commentedOct 29, 2025
I don't think this solves the reverse traversal, as the sourced stream will not know beforehand what it contains locally and the origin stream's consumer might be in any state still requiring acks, etc. etc. I believe scanning will still be required, and the only thing this API solves is performing what is essentially a "consumer create" without actually needing to delete and create. Resetting is otherwise equivalent.
Currently I'm only proposing to use this for the durable consumer sourcing/mirroring. I need such an API to be able to make durable instead of ephemeral sourcing work. That's why this PR is in draft while working out those details. On the other hand, the reset API could be allowed to be used directly by clients also if desired. But that could be a different discussion. |
MauriceVanVeen commentedOct 29, 2025
Anyway, have opened this up as draft so this doesn't need to be reviewed or even merged soon. Just walking down potential paths to get such durable consumer sourcing/mirroring to work, and this currently is the simplest. I don't know yet if this reset API is still needed or not, until after I've finished up getting that sourcing/mirroring to work using a pull consumer, and ensuring it doesn't lose data. |
derekcollison commentedOct 29, 2025
I think this one deserves a meeting to discuss before any progress on this draft. |
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
4b47f79 to5d59f80CompareMauriceVanVeen commentedNov 19, 2025
We've since discussed this andnats-io/nats-architecture-and-design#389 was opened to reflect that discussion. This PR is now updated to take into account the provided feedback, specifically:
|
In 2.14 we'll want an option to allow reliable stream sourcing/mirroring from both a WorkQueue and Interest stream. Currently this is problematic since ephemeral consumers with
AckNoneare used. These immediately ack and remove the message on the source, even if it wouldn't fully make its way to the mirror/source (also mentioned in#7292).For 2.14 a user could provide a durable (pull) consumer to be used for mirroring/sourcing of a stream instead of an ephemeral that's (re)created on demand. However, to ensure ordered consumption we'll need the monotonic increasing nature of the consumer's delivered sequence like the ordered consumer uses.
This PR introduces a new API to reset the starting sequence of a consumer, such that it can be used for durable consumer sourcing. Sending a request to
$JS.API.CONSUMER.RESET.<stream>.<consumer>with{"seq":10}will reset all state of the consumer and move it (either forwards or backwards) to the specified sequence. When fetching messages, the first message to be delivered will be the one with the specified sequence, the consumer's delivered sequence will (re)start at 1.The durable sourcing can use this API as a replacement when it would otherwise recreate a consumer at a specific starting sequence. Ensuring the delivered sequence starts at 1 and increases monotonically such that gaps can be detected.
Resolves#4121
Signed-off-by: Maurice van Veengithub@mauricevanveen.com