Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

(2.14) Reset consumer to new starting sequence#7489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
MauriceVanVeen wants to merge1 commit intomain
base:main
Choose a base branch
Loading
frommaurice/consumer-reset

Conversation

@MauriceVanVeen
Copy link
Member

In 2.14 we'll want an option to allow reliable stream sourcing/mirroring from both a WorkQueue and Interest stream. Currently this is problematic since ephemeral consumers withAckNone are used. These immediately ack and remove the message on the source, even if it wouldn't fully make its way to the mirror/source (also mentioned in#7292).

For 2.14 a user could provide a durable (pull) consumer to be used for mirroring/sourcing of a stream instead of an ephemeral that's (re)created on demand. However, to ensure ordered consumption we'll need the monotonic increasing nature of the consumer's delivered sequence like the ordered consumer uses.

This PR introduces a new API to reset the starting sequence of a consumer, such that it can be used for durable consumer sourcing. Sending a request to$JS.API.CONSUMER.RESET.<stream>.<consumer> with{"seq":10} will reset all state of the consumer and move it (either forwards or backwards) to the specified sequence. When fetching messages, the first message to be delivered will be the one with the specified sequence, the consumer's delivered sequence will (re)start at 1.

The durable sourcing can use this API as a replacement when it would otherwise recreate a consumer at a specific starting sequence. Ensuring the delivered sequence starts at 1 and increases monotonically such that gaps can be detected.

Resolves#4121

Signed-off-by: Maurice van Veengithub@mauricevanveen.com

@derekcollison
Copy link
Member

I have not review the code, but if we put the internal ordered consumer with ack_all, and pick a window such that when the ack_all is triggered we can release all messages below it that have been acked through the normal process should suffice.

Unclear to me why we need to introduce this new concept.

@MauriceVanVeen
Copy link
MemberAuthor

Unclear to me why we need to introduce this new concept.

There's no concept of ordered consumption outside of using an "ordered consumer" that checks for the consumer's delivered sequence, and detects gaps.
Simply using a durable push/pull consumer with AckAll and fetching messages doesn't suffice. Messages can be dropped, creating gaps, and this can otherwise not be recognized resulting in out-of-order sourcing/mirroring.

Enabling reliable ordered sourcing, while using a durable without recreating, requires the ability to reset the starting sequence such that the consumer's delivery sequence resets to 1 and gaps can be detected based on that.

@neilalexander
Copy link
Member

neilalexander commentedOct 29, 2025
edited
Loading

On the flip side, having the ability to reset consumers is very nice for other reasons, i.e. for a mediator pattern where you have an application that would run a worker+ephemeral consumer for each newly discovered subject and a supervising headers-only consumer to determine which workers to start. With this, you can restart your application, reset the supervising consumer back to 0 and whip through the stream to determine which workers to rehydrate without having to maintain any external state.

For the sourcing/mirroring use-case, this enables us to keep durable consumers on the streams that are being sourced/mirrored from but to be able to put the consumer back to where we need it to be, such that we don't blow away messages accidentally by deleting & recreating ephemeral/direct sourcing consumers on an interest-based stream (which is the massive footgun with sourcing today that we'd like to solve).

@ripienaar
Copy link
Contributor

How will this really help with WQ though? You cant have overlapping consumers, sourcing all the subjects would essentially prevent anyone else from using the WQ

@derekcollison
Copy link
Member

I like the idea of solving the sourced streams reverse traversal for sure, but several ways to do that.

Resetting the sequence may have profound implications on clients though that we may not be able to forsee.

@MauriceVanVeen
Copy link
MemberAuthor

@ripienaar

How will this really help with WQ though? You cant have overlapping consumers, sourcing all the subjects would essentially prevent anyone else from using the WQ

With WorkQueue you would indeed only have a single consumer.

In general I think it may make sense to only allow this durable source pattern on Interest and Limits, because WorkQueue has additional constraints:

  • Can't useAckAll at the moment, the consumer requiresAckExplicit.
  • If messages are stored on the one that mirrors but the message was not acked, the mirror will want to reset and move the sequence up. Since WorkQueue doesn't remove messages that don't have interest, these messages will just remain dangling.

Those may be solvable. But generally switching to an Interest based stream would be more flexible, also allowing other consumers not just the one used for sourcing.

@MauriceVanVeen
Copy link
MemberAuthor

@derekcollison

I like the idea of solving the sourced streams reverse traversal for sure, but several ways to do that.

I don't think this solves the reverse traversal, as the sourced stream will not know beforehand what it contains locally and the origin stream's consumer might be in any state still requiring acks, etc. etc.

I believe scanning will still be required, and the only thing this API solves is performing what is essentially a "consumer create" without actually needing to delete and create. Resetting is otherwise equivalent.

Resetting the sequence may have profound implications on clients though that we may not be able to forsee.

Currently I'm only proposing to use this for the durable consumer sourcing/mirroring. I need such an API to be able to make durable instead of ephemeral sourcing work. That's why this PR is in draft while working out those details.

On the other hand, the reset API could be allowed to be used directly by clients also if desired. But that could be a different discussion.

@MauriceVanVeen
Copy link
MemberAuthor

Anyway, have opened this up as draft so this doesn't need to be reviewed or even merged soon.

Just walking down potential paths to get such durable consumer sourcing/mirroring to work, and this currently is the simplest. I don't know yet if this reset API is still needed or not, until after I've finished up getting that sourcing/mirroring to work using a pull consumer, and ensuring it doesn't lose data.

@derekcollison
Copy link
Member

I think this one deserves a meeting to discuss before any progress on this draft.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@MauriceVanVeen
Copy link
MemberAuthor

We've since discussed this andnats-io/nats-architecture-and-design#389 was opened to reflect that discussion. This PR is now updated to take into account the provided feedback, specifically:

  • A "consumer reset" response.
  • Allowing to reset a consumer back to the ack floor, which will be used by the durable sourcing.
  • And only allowing a reset to a specific sequence, if allowed by the deliver policy.

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

No reviews

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

Ability to reset consumer start sequence

5 participants

@MauriceVanVeen@derekcollison@neilalexander@ripienaar

[8]ページ先頭

©2009-2025 Movatter.jp