Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita7d979c

Browse files
(2.14) [IMPROVED] JetStream header indexing
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent85594c8 commita7d979c

File tree

7 files changed

+398
-119
lines changed

7 files changed

+398
-119
lines changed

‎server/jetstream.go‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1520,7 +1520,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
15201520
ifcommitEob&&seq==state.LastSeq {
15211521
hdr=genHeader(hdr,JSBatchCommit,"1")
15221522
}
1523-
mset.processJetStreamMsg(sm.subj,_EMPTY_,hdr,sm.msg,0,0,nil,false,true)
1523+
varhdrIdx*jsHdrIndex
1524+
hdr,hdrIdx=indexJsHdr(hdr)
1525+
mset.processJetStreamMsg(sm.subj,_EMPTY_,hdr,hdrIdx,sm.msg,0,0,nil,false,true)
1526+
hdrIdx.returnToPool()
15241527
}
15251528
store.Delete(true)
15261529
SKIP:

‎server/jetstream_batching.go‎

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (batch *batchApply) rejectBatchState(mset *stream) {
238238
// mset.mu lock must NOT be held or used.
239239
// mset.clMu lock must be held.
240240
funccheckMsgHeadersPreClusteredProposal(
241-
diff*batchStagedDiff,mset*stream,subjectstring,hdr []byte,msg []byte,sourcedbool,namestring,
241+
diff*batchStagedDiff,mset*stream,subjectstring,hdr []byte,hdrIdx*jsHdrIndex,msg []byte,sourcedbool,namestring,
242242
jsa*jsAccount,allowRollup,denyPurge,allowTTL,allowMsgCounter,allowMsgSchedulesbool,
243243
discardDiscardPolicy,discardNewPerbool,maxMsgSizeint,maxMsgsint64,maxMsgsPerint64,maxBytesint64,
244244
) ([]byte, []byte,uint64,*ApiError,error) {
@@ -252,10 +252,13 @@ func checkMsgHeadersPreClusteredProposal(
252252
err:=fmt.Errorf("JetStream header size exceeds limits for '%s > %s'",jsa.acc().Name,mset.cfg.Name)
253253
returnhdr,msg,0,NewJSStreamHeaderExceedsMaximumError(),err
254254
}
255+
}
256+
257+
ifhdrIdx!=nil {
255258
// Counter increments.
256259
// Only supported on counter streams, and payload must be empty (if not coming from a source).
257260
varokbool
258-
ifincr,ok=getMessageIncr(hdr);!ok {
261+
ifincr,ok=hdrIdx.getMessageIncr();!ok {
259262
apiErr:=NewJSMessageIncrInvalidError()
260263
returnhdr,msg,0,apiErr,apiErr
261264
}elseifincr!=nil&&!sourced {
@@ -269,14 +272,14 @@ func checkMsgHeadersPreClusteredProposal(
269272
}else {
270273
// Check for incompatible headers.
271274
vardoErrbool
272-
ifgetRollup(hdr)!=_EMPTY_||
273-
getExpectedStream(hdr)!=_EMPTY_||
274-
getExpectedLastMsgId(hdr)!=_EMPTY_||
275-
getExpectedLastSeqPerSubjectForSubject(hdr)!=_EMPTY_ {
275+
ifhdrIdx.getRollup()!=_EMPTY_||
276+
hdrIdx.getExpectedStream()!=_EMPTY_||
277+
hdrIdx.getExpectedLastMsgId()!=_EMPTY_||
278+
hdrIdx.getExpectedLastSeqPerSubjectForSubject()!=_EMPTY_ {
276279
doErr=true
277-
}elseif_,ok=getExpectedLastSeq(hdr);ok {
280+
}elseif_,ok=hdrIdx.getExpectedLastSeq();ok {
278281
doErr=true
279-
}elseif_,ok=getExpectedLastSeqPerSubject(hdr);ok {
282+
}elseif_,ok=hdrIdx.getExpectedLastSeqPerSubject();ok {
280283
doErr=true
281284
}
282285

@@ -287,11 +290,11 @@ func checkMsgHeadersPreClusteredProposal(
287290
}
288291
}
289292
// Expected stream name can also be pre-checked.
290-
ifsname:=getExpectedStream(hdr);sname!=_EMPTY_&&sname!=name {
293+
ifsname:=hdrIdx.getExpectedStream();sname!=_EMPTY_&&sname!=name {
291294
returnhdr,msg,0,NewJSStreamNotMatchError(),errStreamMismatch
292295
}
293296
// TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
294-
ifttl,err:=getMessageTTL(hdr);!sourced&& (ttl!=0||err!=nil) {
297+
ifttl,err:=hdrIdx.getMessageTTL();!sourced&& (ttl!=0||err!=nil) {
295298
if!allowTTL {
296299
returnhdr,msg,0,NewJSMessageTTLDisabledError(),errMsgTTLDisabled
297300
}elseiferr!=nil {
@@ -300,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal(
300303
}
301304
// Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
302305
// Will help during restarts.
303-
ifmsgId:=getMsgId(hdr);msgId!=_EMPTY_ {
306+
ifmsgId:=hdrIdx.getMsgId();msgId!=_EMPTY_ {
304307
// Dedupe if staged.
305308
if_,ok=diff.msgIds[msgId];ok {
306309
returnhdr,msg,0,NewJSAtomicPublishContainsDuplicateMessageError(),errMsgIdDuplicate
@@ -439,9 +442,9 @@ func checkMsgHeadersPreClusteredProposal(
439442
}
440443
}
441444

442-
iflen(hdr)>0 {
445+
ifhdrIdx!=nil {
443446
// Expected last sequence.
444-
ifseq,exists:=getExpectedLastSeq(hdr);exists&&seq!=mset.clseq-mset.clfs {
447+
ifseq,exists:=hdrIdx.getExpectedLastSeq();exists&&seq!=mset.clseq-mset.clfs {
445448
mlseq:=mset.clseq-mset.clfs
446449
err:=fmt.Errorf("last sequence mismatch: %d vs %d",seq,mlseq)
447450
returnhdr,msg,0,NewJSStreamWrongLastSequenceError(mlseq),err
@@ -452,10 +455,10 @@ func checkMsgHeadersPreClusteredProposal(
452455
}
453456

454457
// Expected last sequence per subject.
455-
ifseq,exists:=getExpectedLastSeqPerSubject(hdr);exists {
458+
ifseq,exists:=hdrIdx.getExpectedLastSeqPerSubject();exists {
456459
// Allow override of the subject used for the check.
457460
seqSubj:=subject
458-
ifoptSubj:=getExpectedLastSeqPerSubjectForSubject(hdr);optSubj!=_EMPTY_ {
461+
ifoptSubj:=hdrIdx.getExpectedLastSeqPerSubjectForSubject();optSubj!=_EMPTY_ {
459462
seqSubj=optSubj
460463
}
461464

@@ -509,13 +512,13 @@ func checkMsgHeadersPreClusteredProposal(
509512
diff.expectedPerSubject[seqSubj]=e
510513
}
511514
}
512-
}elseifgetExpectedLastSeqPerSubjectForSubject(hdr)!=_EMPTY_ {
515+
}elseifhdrIdx.getExpectedLastSeqPerSubjectForSubject()!=_EMPTY_ {
513516
apiErr:=NewJSStreamExpectedLastSeqPerSubjectInvalidError()
514517
returnhdr,msg,0,apiErr,apiErr
515518
}
516519

517520
// Message scheduling.
518-
ifschedule,ok:=getMessageSchedule(hdr);!ok {
521+
ifschedule,ok:=hdrIdx.getMessageSchedule();!ok {
519522
apiErr:=NewJSMessageSchedulesPatternInvalidError()
520523
if!allowMsgSchedules {
521524
apiErr=NewJSMessageSchedulesDisabledError()
@@ -525,12 +528,12 @@ func checkMsgHeadersPreClusteredProposal(
525528
if!allowMsgSchedules {
526529
apiErr:=NewJSMessageSchedulesDisabledError()
527530
returnhdr,msg,0,apiErr,apiErr
528-
}elseifscheduleTtl,ok:=getMessageScheduleTTL(hdr);!ok {
531+
}elseifscheduleTtl,ok:=hdrIdx.getMessageScheduleTTL();!ok {
529532
apiErr:=NewJSMessageSchedulesTTLInvalidError()
530533
returnhdr,msg,0,apiErr,apiErr
531534
}elseifscheduleTtl!=_EMPTY_&&!allowTTL {
532535
returnhdr,msg,0,NewJSMessageTTLDisabledError(),errMsgTTLDisabled
533-
}elseifscheduleTarget:=getMessageScheduleTarget(hdr);scheduleTarget==_EMPTY_||
536+
}elseifscheduleTarget:=hdrIdx.getMessageScheduleTarget();scheduleTarget==_EMPTY_||
534537
!IsValidPublishSubject(scheduleTarget)||SubjectsCollide(scheduleTarget,subject) {
535538
apiErr:=NewJSMessageSchedulesTargetInvalidError()
536539
returnhdr,msg,0,apiErr,apiErr
@@ -547,7 +550,7 @@ func checkMsgHeadersPreClusteredProposal(
547550

548551
// Add a rollup sub header if it doesn't already exist.
549552
// Otherwise, it must exist already as a rollup on the subject.
550-
ifrollup:=getRollup(hdr);rollup==_EMPTY_ {
553+
ifrollup:=hdrIdx.getRollup();rollup==_EMPTY_ {
551554
hdr=genHeader(hdr,JSMsgRollup,JSMsgRollupSubject)
552555
}elseifrollup!=JSMsgRollupSubject {
553556
apiErr:=NewJSMessageSchedulesRollupInvalidError()
@@ -557,7 +560,7 @@ func checkMsgHeadersPreClusteredProposal(
557560
}
558561

559562
// Check for any rollups.
560-
ifrollup:=getRollup(hdr);rollup!=_EMPTY_ {
563+
ifrollup:=hdrIdx.getRollup();rollup!=_EMPTY_ {
561564
if!allowRollup||denyPurge {
562565
err:=errors.New("rollup not permitted")
563566
returnhdr,msg,0,NewJSStreamRollupFailedError(err),err

‎server/jetstream_batching_test.go‎

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,9 @@ func TestJetStreamAtomicBatchPublishStageAndCommit(t *testing.T) {
13881388
hdr=genHeader(hdr,key,value)
13891389
}
13901390
}
1391-
_,_,_,_,err=checkMsgHeadersPreClusteredProposal(diff,mset,m.subject,hdr,m.msg,false,"TEST",nil,test.allowRollup,test.denyPurge,test.allowTTL,test.allowMsgCounter,test.allowMsgSchedules,discard,discardNewPer,-1,maxMsgs,maxMsgsPer,maxBytes)
1391+
_,hdrIdx:=indexJsHdr(hdr)
1392+
_,_,_,_,err=checkMsgHeadersPreClusteredProposal(diff,mset,m.subject,hdr,hdrIdx,m.msg,false,"TEST",nil,test.allowRollup,test.denyPurge,test.allowTTL,test.allowMsgCounter,test.allowMsgSchedules,discard,discardNewPer,-1,maxMsgs,maxMsgsPer,maxBytes)
1393+
hdrIdx.returnToPool()
13921394
ifm.err!=nil {
13931395
require_Error(t,err,m.err)
13941396
}elseiferr!=nil {
@@ -1582,7 +1584,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecovery(t *testing.T) {
15821584
require_True(t,commitReady)
15831585

15841586
// Simulate the first message of the batch is committed.
1585-
err=mset.processJetStreamMsg("foo",_EMPTY_,hdr1,nil,0,0,nil,false,true)
1587+
_,hdrIdx:=indexJsHdr(hdr1)
1588+
err=mset.processJetStreamMsg("foo",_EMPTY_,hdr1,hdrIdx,nil,0,0,nil,false,true)
1589+
hdrIdx.returnToPool()
15861590
require_NoError(t,err)
15871591

15881592
// Simulate a hard kill, upon recovery the rest of the batch should be applied.
@@ -1672,7 +1676,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecoveryCommitEob(t *testing.T)
16721676
require_True(t,commitReady)
16731677

16741678
// Simulate the first message of the batch is committed.
1675-
err=mset.processJetStreamMsg("foo",_EMPTY_,hdr1,nil,0,0,nil,false,true)
1679+
_,hdrIdx:=indexJsHdr(hdr1)
1680+
err=mset.processJetStreamMsg("foo",_EMPTY_,hdr1,hdrIdx,nil,0,0,nil,false,true)
1681+
hdrIdx.returnToPool()
16761682
require_NoError(t,err)
16771683

16781684
// Simulate a hard kill, upon recovery the rest of the batch should be applied.

‎server/jetstream_cluster.go‎

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3665,7 +3665,10 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
36653665
mt=mset.getAndDeleteMsgTrace(lseq)
36663666
}
36673667
// Process the actual message here.
3668-
err=mset.processJetStreamMsg(subject,reply,hdr,msg,lseq,ts,mt,sourced,needLock)
3668+
varhdrIdx*jsHdrIndex
3669+
hdr,hdrIdx=indexJsHdr(hdr)
3670+
err=mset.processJetStreamMsg(subject,reply,hdr,hdrIdx,msg,lseq,ts,mt,sourced,needLock)
3671+
hdrIdx.returnToPool()
36693672

36703673
// If we have inflight make sure to clear after processing.
36713674
// TODO(dlc) - technically check on inflight != nil could cause datarace.
@@ -3732,7 +3735,9 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
37323735
ifstate.Msgs==0 {
37333736
mset.store.Compact(lseq+1)
37343737
// Retry
3735-
err=mset.processJetStreamMsg(subject,reply,hdr,msg,lseq,ts,mt,sourced,needLock)
3738+
hdr,hdrIdx=indexJsHdr(hdr)
3739+
err=mset.processJetStreamMsg(subject,reply,hdr,hdrIdx,msg,lseq,ts,mt,sourced,needLock)
3740+
hdrIdx.returnToPool()
37363741
}
37373742
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
37383743
// and what we got.
@@ -8776,7 +8781,7 @@ func (mset *stream) stateSnapshotLocked() []byte {
87768781
conststreamLagWarnThreshold=10_000
87778782

87788783
// processClusteredInboundMsg will propose the inbound message to the underlying raft group.
8779-
func (mset*stream)processClusteredInboundMsg(subject,replystring,hdr,msg []byte,mt*msgTrace,sourcedbool) (retErrerror) {
8784+
func (mset*stream)processClusteredInboundMsg(subject,replystring,hdr []byte,hdrIdx*jsHdrIndex,msg []byte,mt*msgTrace,sourcedbool) (retErrerror) {
87808785
// For possible error response.
87818786
varresponse []byte
87828787

@@ -8794,7 +8799,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
87948799
// We also invoke this in clustering mode for message tracing when not
87958800
// performing message delivery.
87968801
ifnode==nil||mt.traceOnly() {
8797-
returnmset.processJetStreamMsg(subject,reply,hdr,msg,0,0,mt,sourced,true)
8802+
returnmset.processJetStreamMsg(subject,reply,hdr,hdrIdx,msg,0,0,mt,sourced,true)
87988803
}
87998804

88008805
// If message tracing (with message delivery), we will need to send the
@@ -8898,7 +8903,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
88988903
errerror
88998904
)
89008905
diff:=&batchStagedDiff{}
8901-
ifhdr,msg,dseq,apiErr,err=checkMsgHeadersPreClusteredProposal(diff,mset,subject,hdr,msg,sourced,name,jsa,allowRollup,denyPurge,allowTTL,allowMsgCounter,allowMsgSchedules,discard,discardNewPer,maxMsgSize,maxMsgs,maxMsgsPer,maxBytes);err!=nil {
8906+
ifhdr,msg,dseq,apiErr,err=checkMsgHeadersPreClusteredProposal(diff,mset,subject,hdr,hdrIdx,msg,sourced,name,jsa,allowRollup,denyPurge,allowTTL,allowMsgCounter,allowMsgSchedules,discard,discardNewPer,maxMsgSize,maxMsgs,maxMsgsPer,maxBytes);err!=nil {
89028907
mset.clMu.Unlock()
89038908
iferr==errMsgIdDuplicate&&dseq>0 {
89048909
varbuf [256]byte

‎server/jetstream_cluster_4_test.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4379,7 +4379,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
43794379
validateStreamState(snap)
43804380

43814381
// Simulate a message being stored, but not calling Applied yet.
4382-
err=mset.processJetStreamMsg("foo",_EMPTY_,nil,nil,1,time.Now().UnixNano(),nil,false,true)
4382+
err=mset.processJetStreamMsg("foo",_EMPTY_,nil,nil,nil,1,time.Now().UnixNano(),nil,false,true)
43834383
require_NoError(t,err)
43844384

43854385
// Simulate the stream being stopped before we're able to call Applied.

‎server/jetstream_test.go‎

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22273,7 +22273,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
2227322273
mset, err := s.globalAccount().lookupStream("TEST")
2227422274
require_NoError(t, err)
2227522275
for range 2 {
22276-
require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true))
22276+
require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil,nil,0, 0, nil, false, true))
2227722277
}
2227822278

2227922279
// We'll lock the message blocks such that we can't read, but NumPending should still function.
@@ -22300,7 +22300,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
2230022300
read.Wait()
2230122301
<-time.After(100 * time.Millisecond)
2230222302
wg.Done()
22303-
mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true)
22303+
mset.processJetStreamMsg("foo", _EMPTY_, nil, nil,nil,0, 0, nil, false, true)
2230422304
}()
2230522305
go func() {
2230622306
// Run some time after we've entered processJetStreamMsg above.
@@ -22316,3 +22316,33 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
2231622316
return nil
2231722317
})
2231822318
}
22319+
22320+
func TestJetStreamHdrIndexUpdateHdr(t *testing.T) {
22321+
updateKey := "Nats-Update-Header"
22322+
for _, test := range []struct {
22323+
title string
22324+
updateHdr func(hdr []byte)
22325+
}{
22326+
{title: "SetHeader", updateHdr: func(hdr []byte) { setHeader(updateKey, "s", hdr) }},
22327+
{title: "GenHeader", updateHdr: func(hdr []byte) { genHeader(hdr, updateKey, "s") }},
22328+
{title: "RemoveHeaderIfPresent", updateHdr: func(hdr []byte) { removeHeaderIfPresent(hdr, updateKey) }},
22329+
{title: "RemoveHeaderIfPrefixPresent", updateHdr: func(hdr []byte) { removeHeaderIfPrefixPresent(hdr, updateKey) }},
22330+
} {
22331+
t.Run(test.title, func(t *testing.T) {
22332+
hdr := genHeader(nil, "Nats-Batch-Id", "uuid")
22333+
hdr = genHeader(hdr, updateKey, "long_value")
22334+
hdr = genHeader(hdr, "Nats-Batch-Sequence", "seq")
22335+
22336+
var idx *jsHdrIndex
22337+
hdr, idx = indexJsHdr(hdr)
22338+
defer idx.returnToPool()
22339+
require_NotNil(t, idx)
22340+
require_Equal(t, string(idx.batchId), "uuid")
22341+
require_Equal(t, string(idx.batchSeq), "seq")
22342+
22343+
test.updateHdr(hdr)
22344+
require_Equal(t, string(idx.batchId), "uuid")
22345+
require_Equal(t, string(idx.batchSeq), "seq")
22346+
})
22347+
}
22348+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp