Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5d59f80

Browse files
(2.14) Reset consumer to new starting sequence
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent7cdd5cf commit5d59f80

File tree

9 files changed

+581
-1
lines changed

9 files changed

+581
-1
lines changed

‎server/consumer.go‎

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,13 @@ type consumer struct {
437437
lss*lastSeqSkipList
438438
rlimit*rate.Limiter
439439
reqSub*subscription
440+
resetSub*subscription
440441
ackSub*subscription
441442
ackReplyTstring
442443
ackSubjstring
443444
nextMsgSubjstring
444445
nextMsgReqs*ipQueue[*nextMsgReq]
446+
resetSubjstring
445447
maxpint
446448
pblimitint
447449
maxpbint
@@ -1263,6 +1265,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12631265
o.ackReplyT=fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d",pre)
12641266
o.ackSubj=fmt.Sprintf("%s.*.*.*.*.*",pre)
12651267
o.nextMsgSubj=fmt.Sprintf(JSApiRequestNextT,mn,o.name)
1268+
o.resetSubj=fmt.Sprintf(JSApiConsumerResetT,mn,o.name)
12661269

12671270
// Check/update the inactive threshold
12681271
o.updateInactiveThreshold(&o.cfg)
@@ -1547,6 +1550,11 @@ func (o *consumer) setLeader(isLeader bool) {
15471550
o.deleteWithoutAdvisory()
15481551
return
15491552
}
1553+
ifo.resetSub,err=o.subscribeInternal(o.resetSubj,o.processResetReq);err!=nil {
1554+
o.mu.Unlock()
1555+
o.deleteWithoutAdvisory()
1556+
return
1557+
}
15501558

15511559
// Check on flow control settings.
15521560
ifo.cfg.FlowControl {
@@ -1667,8 +1675,9 @@ func (o *consumer) setLeader(isLeader bool) {
16671675
// ok if they are nil, we protect inside unsubscribe()
16681676
o.unsubscribe(o.ackSub)
16691677
o.unsubscribe(o.reqSub)
1678+
o.unsubscribe(o.resetSub)
16701679
o.unsubscribe(o.fcSub)
1671-
o.ackSub,o.reqSub,o.fcSub=nil,nil,nil
1680+
o.ackSub,o.reqSub,o.resetSub,o.fcSub=nil,nil,nil,nil
16721681
ifo.infoSub!=nil {
16731682
o.srv.sysUnsubscribe(o.infoSub)
16741683
o.infoSub=nil
@@ -2596,6 +2605,78 @@ func (o *consumer) updateSkipped(seq uint64) {
25962605
o.propose(b[:])
25972606
}
25982607

2608+
func (o*consumer)resetStartingSeq(sequint64,replystring) (uint64,bool,error) {
2609+
o.mu.Lock()
2610+
defero.mu.Unlock()
2611+
2612+
// Reset to a specific sequence, or back to the ack floor.
2613+
ifseq==0 {
2614+
seq=o.asflr+1
2615+
}elseifo.cfg.DeliverPolicy==DeliverAll {
2616+
// Always allowed.
2617+
goto VALID
2618+
}elseifo.cfg.DeliverPolicy==DeliverByStartSequence {
2619+
// Only allowed if not going below what's configured.
2620+
ifseq<o.cfg.OptStartSeq {
2621+
return0,false,errors.New("below start seq")
2622+
}
2623+
goto VALID
2624+
}elseifo.cfg.DeliverPolicy==DeliverByStartTime&&o.mset!=nil {
2625+
// Only allowed if not going below what's configured.
2626+
nseq:=o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
2627+
ifseq<nseq {
2628+
return0,false,errors.New("below start time")
2629+
}
2630+
goto VALID
2631+
}else {
2632+
return0,false,errors.New("not allowed")
2633+
}
2634+
2635+
VALID:
2636+
// Must be a minimum of 1.
2637+
ifseq<=0 {
2638+
seq=1
2639+
}
2640+
o.resetLocalStartingSeq(seq)
2641+
// Clustered mode and R>1.
2642+
ifo.node!=nil {
2643+
b:=make([]byte,1+8+len(reply))
2644+
b[0]=byte(resetSeqOp)
2645+
varle=binary.LittleEndian
2646+
le.PutUint64(b[1:],seq)
2647+
copy(b[1+8:],reply)
2648+
o.propose(b[:])
2649+
returnseq,false,nil
2650+
}elseifo.store!=nil {
2651+
o.store.Reset(seq-1)
2652+
// Cleanup messages that lost interest.
2653+
ifo.retention==InterestPolicy {
2654+
ifmset:=o.mset;mset!=nil {
2655+
o.mu.Unlock()
2656+
ss:=mset.state()
2657+
o.checkStateForInterestStream(&ss)
2658+
o.mu.Lock()
2659+
}
2660+
}
2661+
2662+
// Recalculate pending, and re-trigger message delivery.
2663+
o.streamNumPending()
2664+
o.signalNewMessages()
2665+
returnseq,true,nil
2666+
}
2667+
returnseq,false,nil
2668+
}
2669+
2670+
// Lock should be held.
2671+
func (o*consumer)resetLocalStartingSeq(sequint64) {
2672+
o.pending,o.rdc=nil,nil
2673+
o.rdq=nil
2674+
o.rdqi.Empty()
2675+
o.sseq,o.dseq=seq,1
2676+
o.adflr,o.asflr=o.dseq-1,o.sseq-1
2677+
o.ldt,o.lat= time.Time{}, time.Time{}
2678+
}
2679+
25992680
func (o*consumer)loopAndForwardProposals(qchchanstruct{}) {
26002681
// On exit make sure we nil out pch.
26012682
deferfunc() {
@@ -4119,6 +4200,48 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
41194200
o.nextMsgReqs.push(newNextMsgReq(reply,copyBytes(msg)))
41204201
}
41214202

4203+
// processResetReq will reset a consumer to a new starting sequence.
4204+
func (o*consumer)processResetReq(_*subscription,c*client,a*Account,_,replystring,rmsg []byte) {
4205+
ifreply==_EMPTY_ {
4206+
return
4207+
}
4208+
4209+
s:=o.srv
4210+
varresp=JSApiConsumerResetResponse{ApiResponse:ApiResponse{Type:JSApiConsumerResetResponseType}}
4211+
4212+
hdr,msg:=c.msgParts(rmsg)
4213+
iferrorOnRequiredApiLevel(hdr) {
4214+
resp.Error=NewJSRequiredApiLevelError()
4215+
s.sendInternalAccountMsg(a,reply,s.jsonResponse(&resp))
4216+
return
4217+
}
4218+
4219+
// An empty message resets back to the ack floor, otherwise a custom sequence is used.
4220+
varreqJSApiConsumerResetRequest
4221+
iflen(msg)>0 {
4222+
iferr:=json.Unmarshal(msg,&req);err!=nil {
4223+
resp.Error=NewJSInvalidJSONError(err)
4224+
s.sendInternalAccountMsg(a,reply,s.jsonResponse(&resp))
4225+
return
4226+
}
4227+
// Resetting to 0 is invalid.
4228+
ifreq.Seq==0 {
4229+
resp.Error=NewJSInvalidJSONError(errors.New("reset to zero seq"))
4230+
s.sendInternalAccountMsg(a,reply,s.jsonResponse(&resp))
4231+
return
4232+
}
4233+
}
4234+
resetSeq,canRespond,err:=o.resetStartingSeq(req.Seq,reply)
4235+
iferr!=nil {
4236+
resp.Error=NewJSConsumerInvalidResetError(err)
4237+
s.sendInternalAccountMsg(a,reply,s.jsonResponse(&resp))
4238+
}elseifcanRespond {
4239+
resp.ConsumerInfo=setDynamicConsumerInfoMetadata(o.info())
4240+
resp.ResetSeq=resetSeq
4241+
s.sendInternalAccountMsg(a,reply,s.jsonResponse(&resp))
4242+
}
4243+
}
4244+
41224245
func (o*consumer)processNextMsgRequest(replystring,msg []byte) {
41234246
o.mu.Lock()
41244247
defero.mu.Unlock()
@@ -6060,9 +6183,11 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
60606183
o.active=false
60616184
o.unsubscribe(o.ackSub)
60626185
o.unsubscribe(o.reqSub)
6186+
o.unsubscribe(o.resetSub)
60636187
o.unsubscribe(o.fcSub)
60646188
o.ackSub=nil
60656189
o.reqSub=nil
6190+
o.resetSub=nil
60666191
o.fcSub=nil
60676192
ifo.infoSub!=nil {
60686193
o.srv.sysUnsubscribe(o.infoSub)

‎server/errors.json‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,5 +1998,15 @@
19981998
"help":"",
19991999
"url":"",
20002000
"deprecates":""
2001+
},
2002+
{
2003+
"constant":"JSConsumerInvalidResetErr",
2004+
"code":400,
2005+
"error_code":10202,
2006+
"description":"invalid reset: {err}",
2007+
"comment":"",
2008+
"help":"",
2009+
"url":"",
2010+
"deprecates":""
20012011
}
20022012
]

‎server/filestore.go‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11184,6 +11184,7 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) {
1118411184
func (o*consumerFileStore)SetStarting(ssequint64)error {
1118511185
o.mu.Lock()
1118611186
o.state.Delivered.Stream=sseq
11187+
o.state.AckFloor.Stream=sseq
1118711188
buf,err:=o.encodeState()
1118811189
o.mu.Unlock()
1118911190
iferr!=nil {
@@ -11208,6 +11209,14 @@ func (o *consumerFileStore) UpdateStarting(sseq uint64) {
1120811209
o.kickFlusher()
1120911210
}
1121011211

11212+
// Reset all values in the store, and reset the starting sequence.
11213+
func (o*consumerFileStore)Reset(ssequint64)error {
11214+
o.mu.Lock()
11215+
o.state=ConsumerState{}
11216+
o.mu.Unlock()
11217+
returno.SetStarting(sseq)
11218+
}
11219+
1121111220
// HasState returns if this store has a recorded state.
1121211221
func (o*consumerFileStore)HasState()bool {
1121311222
o.mu.Lock()

‎server/jetstream_api.go‎

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ const (
155155
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
156156
JSApiRequestNextT="$JS.API.CONSUMER.MSG.NEXT.%s.%s"
157157

158+
// JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
159+
JSApiConsumerResetT="$JS.API.CONSUMER.RESET.%s.%s"
160+
158161
// JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
159162
JSApiConsumerUnpin="$JS.API.CONSUMER.UNPIN.*.*"
160163
JSApiConsumerUnpinT="$JS.API.CONSUMER.UNPIN.%s.%s"
@@ -757,6 +760,19 @@ type JSApiConsumerGetNextRequest struct {
757760
PriorityGroup
758761
}
759762

763+
// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
764+
typeJSApiConsumerResetRequeststruct {
765+
Sequint64`json:"seq"`
766+
}
767+
768+
typeJSApiConsumerResetResponsestruct {
769+
ApiResponse
770+
*ConsumerInfo
771+
ResetSequint64`json:"reset_seq"`
772+
}
773+
774+
constJSApiConsumerResetResponseType="io.nats.jetstream.api.v1.consumer_reset_response"
775+
760776
// Structure that holds state for a JetStream API request that is processed
761777
// in a separate long-lived go routine. This is to avoid blocking connections.
762778
typejsAPIRoutedReqstruct {

‎server/jetstream_cluster.go‎

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ const (
124124
// Batch stream ops.
125125
batchMsgOp
126126
batchCommitMsgOp
127+
// Consumer rest to specific starting sequence.
128+
resetSeqOp
127129
)
128130

129131
// raftGroups are controlled by the metagroup controller.
@@ -5832,6 +5834,39 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
58325834
o.store.UpdateStarting(sseq-1)
58335835
}
58345836
o.mu.Unlock()
5837+
caseresetSeqOp:
5838+
o.mu.Lock()
5839+
varle=binary.LittleEndian
5840+
sseq:=le.Uint64(buf[1:9])
5841+
reply:=string(buf[9:])
5842+
o.resetLocalStartingSeq(sseq)
5843+
ifo.store!=nil {
5844+
o.store.Reset(sseq-1)
5845+
}
5846+
// Cleanup messages that lost interest.
5847+
ifo.retention==InterestPolicy {
5848+
ifmset:=o.mset;mset!=nil {
5849+
o.mu.Unlock()
5850+
ss:=mset.state()
5851+
o.checkStateForInterestStream(&ss)
5852+
o.mu.Lock()
5853+
}
5854+
}
5855+
// Recalculate pending, and re-trigger message delivery.
5856+
if!o.isLeader() {
5857+
o.mu.Unlock()
5858+
}else {
5859+
o.streamNumPending()
5860+
o.signalNewMessages()
5861+
s,a:=o.srv,o.acc
5862+
o.mu.Unlock()
5863+
ifreply!=_EMPTY_ {
5864+
varresp=JSApiConsumerResetResponse{ApiResponse:ApiResponse{Type:JSApiConsumerResetResponseType}}
5865+
resp.ConsumerInfo=setDynamicConsumerInfoMetadata(o.info())
5866+
resp.ResetSeq=sseq
5867+
s.sendInternalAccountMsg(a,reply,s.jsonResponse(&resp))
5868+
}
5869+
}
58355870
caseaddPendingRequest:
58365871
o.mu.Lock()
58375872
if!o.isLeader() {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp