Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7cbbaa4

Browse files
(2.14) Repeating schedule on an interval
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parente2661b5 commit7cbbaa4

File tree

6 files changed

+167
-12
lines changed

6 files changed

+167
-12
lines changed

‎server/filestore.go‎

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,7 +1649,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
16491649
mb.ttls++
16501650
}
16511651
ifmb.fs.scheduling!=nil {
1652-
ifschedule,ok:=getMessageSchedule(hdr);ok&&!schedule.IsZero() {
1652+
ifschedule,ok:=nextMessageSchedule(hdr,ts);ok&&!schedule.IsZero() {
16531653
mb.fs.scheduling.add(seq,string(subj),schedule.UnixNano())
16541654
mb.schedules++
16551655
}
@@ -2173,7 +2173,7 @@ func (fs *fileStore) recoverMsgSchedulingState() error {
21732173
iflen(msg.hdr)==0 {
21742174
continue
21752175
}
2176-
ifschedule,ok:=getMessageSchedule(sm.hdr);ok&&!schedule.IsZero() {
2176+
ifschedule,ok:=nextMessageSchedule(sm.hdr,sm.ts);ok&&!schedule.IsZero() {
21772177
fs.scheduling.init(seq,sm.subj,schedule.UnixNano())
21782178
}
21792179
}
@@ -4455,12 +4455,20 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t
44554455

44564456
// Message scheduling.
44574457
iffs.scheduling!=nil {
4458-
ifschedule,ok:=getMessageSchedule(hdr);ok&&!schedule.IsZero() {
4458+
ifschedule,ok:=nextMessageSchedule(hdr,ts);ok&&!schedule.IsZero() {
44594459
fs.scheduling.add(seq,subj,schedule.UnixNano())
44604460
fs.lmb.schedules++
44614461
}else {
44624462
fs.scheduling.removeSubject(subj)
44634463
}
4464+
4465+
// Check for a repeating schedule and update such that it triggers again.
4466+
ifscheduleNext:=bytesToString(sliceHeader(JSScheduleNext,hdr));scheduleNext!=_EMPTY_&&scheduleNext!=JSScheduleNextPurge {
4467+
scheduler:=getMessageScheduler(hdr)
4468+
ifnext,err:=time.Parse(time.RFC3339Nano,scheduleNext);err==nil&&scheduler!=_EMPTY_ {
4469+
fs.scheduling.update(scheduler,next.UnixNano())
4470+
}
4471+
}
44644472
}
44654473

44664474
returnnil

‎server/jetstream_cluster_1_test.go‎

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9486,6 +9486,56 @@ func TestJetStreamClusterScheduledDelayedMessageReversedHeaderOrder(t *testing.T
94869486
}
94879487
}
94889488

9489+
funcTestJetStreamClusterScheduledIntervalMessage(t*testing.T) {
9490+
for_,replicas:=range []int{1,3} {
9491+
for_,storage:=range []StorageType{FileStorage,MemoryStorage} {
9492+
t.Run(fmt.Sprintf("R%d/%s",replicas,storage),func(t*testing.T) {
9493+
c:=createJetStreamClusterExplicit(t,"R3S",3)
9494+
deferc.shutdown()
9495+
9496+
nc,js:=jsClientConnect(t,c.randomServer())
9497+
defernc.Close()
9498+
9499+
cfg:=&StreamConfig{
9500+
Name:"SchedulesEnabled",
9501+
Subjects: []string{"foo.*"},
9502+
Storage:storage,
9503+
Replicas:replicas,
9504+
AllowMsgSchedules:true,
9505+
}
9506+
_,err:=jsStreamCreate(t,nc,cfg)
9507+
require_NoError(t,err)
9508+
9509+
// Schedule with an interval that triggers often.
9510+
m:=nats.NewMsg("foo.schedule")
9511+
m.Header.Set("Nats-Schedule","@every 1s")
9512+
m.Header.Set("Nats-Schedule-Target","foo.publish")
9513+
pubAck,err:=js.PublishMsg(m)
9514+
require_NoError(t,err)
9515+
require_Equal(t,pubAck.Sequence,1)
9516+
9517+
sl:=c.streamLeader(globalAccountName,"SchedulesEnabled")
9518+
mset,err:=sl.globalAccount().lookupStream("SchedulesEnabled")
9519+
require_NoError(t,err)
9520+
9521+
// Waiting for the repeated message to be published enough times.
9522+
checkFor(t,4*time.Second,200*time.Millisecond,func()error {
9523+
total:=mset.Store().SubjectsTotals("foo.publish")["foo.publish"]
9524+
iftotal<3 {
9525+
returnfmt.Errorf("expected at least 3 messages, got %d",total)
9526+
}
9527+
returnnil
9528+
})
9529+
9530+
// Servers should be synced.
9531+
checkFor(t,2*time.Second,200*time.Millisecond,func()error {
9532+
returncheckState(t,c,globalAccountName,"SchedulesEnabled")
9533+
})
9534+
})
9535+
}
9536+
}
9537+
}
9538+
94899539
funcTestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t*testing.T) {
94909540
clusterName:="R3S"
94919541
c:=createJetStreamClusterExplicit(t,clusterName,3)

‎server/jetstream_test.go‎

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22255,6 +22255,32 @@ func TestJetStreamScheduledMessageNotDeactivated(t *testing.T) {
2225522255
}
2225622256
}
2225722257

22258+
func TestJetStreamScheduledMessageParse(t *testing.T) {
22259+
// @at <ts>
22260+
ts := time.Now().UTC()
22261+
sts, repeat, ok := parseMsgSchedule(fmt.Sprintf("@at %s", ts.Format(time.RFC3339Nano)), 0)
22262+
require_True(t, ok)
22263+
require_False(t, repeat)
22264+
require_Equal(t, ts, sts)
22265+
22266+
// @every <duration>
22267+
now := time.Now().UTC().Round(time.Second)
22268+
sts, repeat, ok = parseMsgSchedule("@every 5s", now.UnixNano())
22269+
require_True(t, ok)
22270+
require_True(t, repeat)
22271+
require_Equal(t, now.Add(5*time.Second), sts)
22272+
22273+
// A schedule on an interval should not spam loads of times if it hasn't run in a long while.
22274+
sts, repeat, ok = parseMsgSchedule("@every 5s", 0)
22275+
require_True(t, ok)
22276+
require_True(t, repeat)
22277+
require_True(t, sts.After(time.Unix(0, 0).UTC().Add(5*time.Second)))
22278+
22279+
// A schedule can only run at least once every second.
22280+
_, _, ok = parseMsgSchedule("@every 999ms", 0)
22281+
require_False(t, ok)
22282+
}
22283+
2225822284
func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
2225922285
s := RunBasicJetStreamServer(t)
2226022286
defer s.Shutdown()

‎server/memstore.go‎

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (ms *memStore) recoverMsgSchedulingState() {
185185
iflen(sm.hdr)==0 {
186186
continue
187187
}
188-
ifschedule,ok:=getMessageSchedule(sm.hdr);ok&&!schedule.IsZero() {
188+
ifschedule,ok:=nextMessageSchedule(sm.hdr,sm.ts);ok&&!schedule.IsZero() {
189189
ms.scheduling.init(seq,sm.subj,schedule.UnixNano())
190190
}
191191
}
@@ -310,11 +310,19 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, tt
310310

311311
// Message scheduling.
312312
ifms.scheduling!=nil {
313-
ifschedule,ok:=getMessageSchedule(hdr);ok&&!schedule.IsZero() {
313+
ifschedule,ok:=nextMessageSchedule(hdr,ts);ok&&!schedule.IsZero() {
314314
ms.scheduling.add(seq,subj,schedule.UnixNano())
315315
}else {
316316
ms.scheduling.removeSubject(subj)
317317
}
318+
319+
// Check for a repeating schedule and update such that it triggers again.
320+
ifscheduleNext:=bytesToString(sliceHeader(JSScheduleNext,hdr));scheduleNext!=_EMPTY_&&scheduleNext!=JSScheduleNextPurge {
321+
scheduler:=getMessageScheduler(hdr)
322+
ifnext,err:=time.Parse(time.RFC3339Nano,scheduleNext);err==nil&&scheduler!=_EMPTY_ {
323+
ms.scheduling.update(scheduler,next.UnixNano())
324+
}
325+
}
318326
}
319327

320328
returnnil

‎server/scheduler.go‎

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"io"
2020
"math"
2121
"slices"
22+
"strings"
2223
"time"
2324

2425
"github.com/nats-io/nats-server/v2/server/thw"
@@ -77,6 +78,18 @@ func (ms *MsgScheduling) init(seq uint64, subj string, ts int64) {
7778
delete(ms.inflight,subj)
7879
}
7980

81+
func (ms*MsgScheduling)update(subjstring,tsint64) {
82+
ifsched,ok:=ms.schedules[subj];ok {
83+
// Remove and add separately, it's for the same sequence, but if replicated
84+
// this server could not know the previous timestamp yet.
85+
ms.ttls.Remove(sched.seq,sched.ts)
86+
ms.ttls.Add(sched.seq,ts)
87+
sched.ts=ts
88+
delete(ms.inflight,subj)
89+
ms.resetTimer()
90+
}
91+
}
92+
8093
func (ms*MsgScheduling)markInflight(subjstring) {
8194
if_,ok:=ms.schedules[subj];ok {
8295
ms.inflight[subj]=struct{}{}
@@ -159,6 +172,16 @@ func (ms *MsgScheduling) getScheduledMessages(loadMsg func(seq uint64, smv *Stor
159172
returnfalse
160173
}
161174
// Validate the contents are correct if not, we just remove it from THW.
175+
pattern:=bytesToString(sliceHeader(JSSchedulePattern,sm.hdr))
176+
ifpattern==_EMPTY_ {
177+
ms.remove(seq)
178+
returntrue
179+
}
180+
next,repeat,ok:=parseMsgSchedule(pattern,ts)
181+
if!ok {
182+
ms.remove(seq)
183+
returntrue
184+
}
162185
ttl,ok:=getMessageScheduleTTL(sm.hdr)
163186
if!ok {
164187
ms.remove(seq)
@@ -184,7 +207,11 @@ func (ms *MsgScheduling) getScheduledMessages(loadMsg func(seq uint64, smv *Stor
184207

185208
// Add headers for the scheduled message.
186209
hdr=genHeader(hdr,JSScheduler,sm.subj)
187-
hdr=genHeader(hdr,JSScheduleNext,JSScheduleNextPurge)// Purge the schedule message itself.
210+
if!repeat {
211+
hdr=genHeader(hdr,JSScheduleNext,JSScheduleNextPurge)// Purge the schedule message itself.
212+
}else {
213+
hdr=genHeader(hdr,JSScheduleNext,next.Format(time.RFC3339))// Next time the schedule fires.
214+
}
188215
ifttl!=_EMPTY_ {
189216
hdr=genHeader(hdr,JSMessageTTL,ttl)
190217
}
@@ -261,3 +288,35 @@ func (ms *MsgScheduling) decode(b []byte) (uint64, error) {
261288
}
262289
returnstamp,nil
263290
}
291+
292+
// parseMsgSchedule parses a message schedule pattern and returns the time
293+
// to fire, whether it is a repeating schedule, and whether the pattern was valid.
294+
funcparseMsgSchedule(patternstring,tsint64) (time.Time,bool,bool) {
295+
ifpattern==_EMPTY_ {
296+
return time.Time{},false,true
297+
}
298+
// Exact time.
299+
ifstrings.HasPrefix(pattern,"@at ") {
300+
t,err:=time.Parse(time.RFC3339,pattern[4:])
301+
returnt,false,err==nil
302+
}
303+
// Repeating on a simple interval.
304+
ifstrings.HasPrefix(pattern,"@every ") {
305+
dur,err:=time.ParseDuration(pattern[7:])
306+
iferr!=nil {
307+
return time.Time{},false,false
308+
}
309+
// Only allow intervals of at least a second.
310+
ifdur.Seconds()<1 {
311+
return time.Time{},false,false
312+
}
313+
// If this schedule would trigger multiple times, for example after a restart, skip ahead and only fire once.
314+
next:=time.Unix(0,ts).UTC().Round(time.Second).Add(dur)
315+
ifnow:=time.Now().UTC().Round(time.Second);next.Before(now) {
316+
next=now
317+
}
318+
returnnext,true,true
319+
}
320+
return time.Time{},false,false
321+
322+
}

‎server/stream.go‎

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4808,14 +4808,18 @@ func getMessageSchedule(hdr []byte) (time.Time, bool) {
48084808
return time.Time{},true
48094809
}
48104810
val:=bytesToString(sliceHeader(JSSchedulePattern,hdr))
4811-
ifval==_EMPTY_ {
4811+
schedule,_,ok:=parseMsgSchedule(val,time.Now().UTC().UnixNano())
4812+
returnschedule,ok
4813+
}
4814+
4815+
// Fast lookup and calculation of next message schedule.
4816+
funcnextMessageSchedule(hdr []byte,tsint64) (time.Time,bool) {
4817+
iflen(hdr)==0 {
48124818
return time.Time{},true
48134819
}
4814-
if!strings.HasPrefix(val,"@at ") {
4815-
return time.Time{},false
4816-
}
4817-
t,err:=time.Parse(time.RFC3339,val[4:])
4818-
returnt,err==nil
4820+
val:=bytesToString(sliceHeader(JSSchedulePattern,hdr))
4821+
schedule,_,ok:=parseMsgSchedule(val,ts)
4822+
returnschedule,ok
48194823
}
48204824

48214825
// Fast lookup of the message schedule TTL from headers.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp