@@ -238,7 +238,7 @@ func (batch *batchApply) rejectBatchState(mset *stream) {
238238// mset.mu lock must NOT be held or used.
239239// mset.clMu lock must be held.
240240func checkMsgHeadersPreClusteredProposal (
241- diff * batchStagedDiff ,mset * stream ,subject string ,hdr []byte ,msg []byte ,sourced bool ,name string ,
241+ diff * batchStagedDiff ,mset * stream ,subject string ,hdr []byte ,hdrIdx * jsHdrIndex , msg []byte ,sourced bool ,name string ,
242242jsa * jsAccount ,allowRollup ,denyPurge ,allowTTL ,allowMsgCounter ,allowMsgSchedules bool ,
243243discard DiscardPolicy ,discardNewPer bool ,maxMsgSize int ,maxMsgs int64 ,maxMsgsPer int64 ,maxBytes int64 ,
244244) ([]byte , []byte ,uint64 ,* ApiError ,error ) {
@@ -252,10 +252,13 @@ func checkMsgHeadersPreClusteredProposal(
252252err := fmt .Errorf ("JetStream header size exceeds limits for '%s > %s'" ,jsa .acc ().Name ,mset .cfg .Name )
253253return hdr ,msg ,0 ,NewJSStreamHeaderExceedsMaximumError (),err
254254}
255+ }
256+
257+ if hdrIdx != nil {
255258// Counter increments.
256259// Only supported on counter streams, and payload must be empty (if not coming from a source).
257260var ok bool
258- if incr ,ok = getMessageIncr (hdr );! ok {
261+ if incr ,ok = hdrIdx . getMessageIncr ();! ok {
259262apiErr := NewJSMessageIncrInvalidError ()
260263return hdr ,msg ,0 ,apiErr ,apiErr
261264}else if incr != nil && ! sourced {
@@ -269,14 +272,14 @@ func checkMsgHeadersPreClusteredProposal(
269272}else {
270273// Check for incompatible headers.
271274var doErr bool
272- if getRollup (hdr )!= _EMPTY_ ||
273- getExpectedStream (hdr )!= _EMPTY_ ||
274- getExpectedLastMsgId (hdr )!= _EMPTY_ ||
275- getExpectedLastSeqPerSubjectForSubject (hdr )!= _EMPTY_ {
275+ if hdrIdx . getRollup ()!= _EMPTY_ ||
276+ hdrIdx . getExpectedStream ()!= _EMPTY_ ||
277+ hdrIdx . getExpectedLastMsgId ()!= _EMPTY_ ||
278+ hdrIdx . getExpectedLastSeqPerSubjectForSubject ()!= _EMPTY_ {
276279doErr = true
277- }else if _ ,ok = getExpectedLastSeq (hdr );ok {
280+ }else if _ ,ok = hdrIdx . getExpectedLastSeq ();ok {
278281doErr = true
279- }else if _ ,ok = getExpectedLastSeqPerSubject (hdr );ok {
282+ }else if _ ,ok = hdrIdx . getExpectedLastSeqPerSubject ();ok {
280283doErr = true
281284}
282285
@@ -287,11 +290,11 @@ func checkMsgHeadersPreClusteredProposal(
287290}
288291}
289292// Expected stream name can also be pre-checked.
290- if sname := getExpectedStream (hdr );sname != _EMPTY_ && sname != name {
293+ if sname := hdrIdx . getExpectedStream ();sname != _EMPTY_ && sname != name {
291294return hdr ,msg ,0 ,NewJSStreamNotMatchError (),errStreamMismatch
292295}
293296// TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
294- if ttl ,err := getMessageTTL (hdr );! sourced && (ttl != 0 || err != nil ) {
297+ if ttl ,err := hdrIdx . getMessageTTL ();! sourced && (ttl != 0 || err != nil ) {
295298if ! allowTTL {
296299return hdr ,msg ,0 ,NewJSMessageTTLDisabledError (),errMsgTTLDisabled
297300}else if err != nil {
@@ -300,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal(
300303}
301304// Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
302305// Will help during restarts.
303- if msgId := getMsgId (hdr );msgId != _EMPTY_ {
306+ if msgId := hdrIdx . getMsgId ();msgId != _EMPTY_ {
304307// Dedupe if staged.
305308if _ ,ok = diff .msgIds [msgId ];ok {
306309return hdr ,msg ,0 ,NewJSAtomicPublishContainsDuplicateMessageError (),errMsgIdDuplicate
@@ -439,9 +442,9 @@ func checkMsgHeadersPreClusteredProposal(
439442}
440443}
441444
442- if len ( hdr ) > 0 {
445+ if hdrIdx != nil {
443446// Expected last sequence.
444- if seq ,exists := getExpectedLastSeq (hdr );exists && seq != mset .clseq - mset .clfs {
447+ if seq ,exists := hdrIdx . getExpectedLastSeq ();exists && seq != mset .clseq - mset .clfs {
445448mlseq := mset .clseq - mset .clfs
446449err := fmt .Errorf ("last sequence mismatch: %d vs %d" ,seq ,mlseq )
447450return hdr ,msg ,0 ,NewJSStreamWrongLastSequenceError (mlseq ),err
@@ -452,10 +455,10 @@ func checkMsgHeadersPreClusteredProposal(
452455}
453456
454457// Expected last sequence per subject.
455- if seq ,exists := getExpectedLastSeqPerSubject (hdr );exists {
458+ if seq ,exists := hdrIdx . getExpectedLastSeqPerSubject ();exists {
456459// Allow override of the subject used for the check.
457460seqSubj := subject
458- if optSubj := getExpectedLastSeqPerSubjectForSubject (hdr );optSubj != _EMPTY_ {
461+ if optSubj := hdrIdx . getExpectedLastSeqPerSubjectForSubject ();optSubj != _EMPTY_ {
459462seqSubj = optSubj
460463}
461464
@@ -509,13 +512,13 @@ func checkMsgHeadersPreClusteredProposal(
509512diff .expectedPerSubject [seqSubj ]= e
510513}
511514}
512- }else if getExpectedLastSeqPerSubjectForSubject (hdr )!= _EMPTY_ {
515+ }else if hdrIdx . getExpectedLastSeqPerSubjectForSubject ()!= _EMPTY_ {
513516apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError ()
514517return hdr ,msg ,0 ,apiErr ,apiErr
515518}
516519
517520// Message scheduling.
518- if schedule ,ok := getMessageSchedule (hdr );! ok {
521+ if schedule ,ok := hdrIdx . getMessageSchedule ();! ok {
519522apiErr := NewJSMessageSchedulesPatternInvalidError ()
520523if ! allowMsgSchedules {
521524apiErr = NewJSMessageSchedulesDisabledError ()
@@ -525,12 +528,12 @@ func checkMsgHeadersPreClusteredProposal(
525528if ! allowMsgSchedules {
526529apiErr := NewJSMessageSchedulesDisabledError ()
527530return hdr ,msg ,0 ,apiErr ,apiErr
528- }else if scheduleTtl ,ok := getMessageScheduleTTL (hdr );! ok {
531+ }else if scheduleTtl ,ok := hdrIdx . getMessageScheduleTTL ();! ok {
529532apiErr := NewJSMessageSchedulesTTLInvalidError ()
530533return hdr ,msg ,0 ,apiErr ,apiErr
531534}else if scheduleTtl != _EMPTY_ && ! allowTTL {
532535return hdr ,msg ,0 ,NewJSMessageTTLDisabledError (),errMsgTTLDisabled
533- }else if scheduleTarget := getMessageScheduleTarget (hdr );scheduleTarget == _EMPTY_ ||
536+ }else if scheduleTarget := hdrIdx . getMessageScheduleTarget ();scheduleTarget == _EMPTY_ ||
534537! IsValidPublishSubject (scheduleTarget )|| SubjectsCollide (scheduleTarget ,subject ) {
535538apiErr := NewJSMessageSchedulesTargetInvalidError ()
536539return hdr ,msg ,0 ,apiErr ,apiErr
@@ -547,7 +550,7 @@ func checkMsgHeadersPreClusteredProposal(
547550
548551// Add a rollup sub header if it doesn't already exist.
549552// Otherwise, it must exist already as a rollup on the subject.
550- if rollup := getRollup (hdr );rollup == _EMPTY_ {
553+ if rollup := hdrIdx . getRollup ();rollup == _EMPTY_ {
551554hdr = genHeader (hdr ,JSMsgRollup ,JSMsgRollupSubject )
552555}else if rollup != JSMsgRollupSubject {
553556apiErr := NewJSMessageSchedulesRollupInvalidError ()
@@ -557,7 +560,7 @@ func checkMsgHeadersPreClusteredProposal(
557560}
558561
559562// Check for any rollups.
560- if rollup := getRollup (hdr );rollup != _EMPTY_ {
563+ if rollup := hdrIdx . getRollup ();rollup != _EMPTY_ {
561564if ! allowRollup || denyPurge {
562565err := errors .New ("rollup not permitted" )
563566return hdr ,msg ,0 ,NewJSStreamRollupFailedError (err ),err