Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita7e89d1

Browse files
committed
Make stream snapshots asynchronous
The monitorStream function can block for a long time when creating andinstalling a snapshot of a stream's state. This can lead to increasedtail latency.This commit extends the RaftNode interface with a new InstallSnapshotAsyncmethod. This method performs snapshot writing and WAL compaction in ain a separate goroutine, making the process non-blocking. The existingInstallSnapshot method is now a synchronous wrapper around the newasynchronous implementation.Signed-off-by: Daniele Sciascia <daniele@nats.io>
1 parente2661b5 commita7e89d1

File tree

2 files changed

+145
-33
lines changed

2 files changed

+145
-33
lines changed

‎server/jetstream_cluster.go‎

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2519,9 +2519,21 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25192519
// fully recovered from disk.
25202520
isRecovering:=true
25212521

2522-
doSnapshot:=func() {
2522+
snapState:=struct {
2523+
inProgressbool
2524+
curStateSimpleState
2525+
chchanInstalledSnapshot
2526+
}{
2527+
ch:make(chanInstalledSnapshot,1),
2528+
}
2529+
2530+
wantSnapshot:=func()bool {
25232531
ifmset==nil||isRecovering||isRestore {
2524-
return
2532+
returnfalse
2533+
}
2534+
2535+
ifsnapState.inProgress {
2536+
returnfalse
25252537
}
25262538

25272539
// Before we actually calculate the detailed state and encode it, let's check the
@@ -2534,18 +2546,56 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25342546
// consumers on idle streams but better to be safe than sorry!
25352547
ne,nb:=n.Size()
25362548
ifcurState==lastState&&ne<compactNumMin&&nb<compactSizeMin {
2537-
return
2549+
returnfalse
2550+
}
2551+
2552+
snapState.curState=curState
2553+
returntrue
2554+
}
2555+
2556+
handleSnapshotErr:=func(errerror) {
2557+
switcherr {
2558+
casenil:
2559+
lastState=snapState.curState
2560+
caseerrNoSnapAvailable,errNodeClosed,errCatchupsRunning:
2561+
// ignore the error
2562+
default:
2563+
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v",
2564+
mset.acc.Name,mset.name(),n.Group(),err)
2565+
}
2566+
2567+
}
2568+
2569+
doSnapshot:=func() {
2570+
ifwantSnapshot() {
2571+
// Make sure all pending data is flushed before allowing snapshots.
2572+
mset.flushAllPending()
2573+
err:=n.InstallSnapshot(mset.stateSnapshot())
2574+
handleSnapshotErr(err)
25382575
}
2576+
}
25392577

2540-
// Make sure all pending data is flushed before allowing snapshots.
2541-
mset.flushAllPending()
2542-
iferr:=n.InstallSnapshot(mset.stateSnapshot());err==nil {
2543-
lastState=curState
2544-
}elseiferr!=errNoSnapAvailable&&err!=errNodeClosed&&err!=errCatchupsRunning {
2545-
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v",mset.acc.Name,mset.name(),n.Group(),err)
2578+
doSnapshotAsync:=func() {
2579+
ifwantSnapshot() {
2580+
// Make sure all pending data is flushed before allowing snapshots.
2581+
mset.flushAllPending()
2582+
n.InstallSnapshotAsync(mset.stateSnapshot(),snapState.ch)
2583+
snapState.inProgress=true
25462584
}
25472585
}
25482586

2587+
snapshotDone:=func(snapInstalledSnapshot) {
2588+
handleSnapshotErr(snap.Err)
2589+
snapState.inProgress=false
2590+
}
2591+
2592+
deferfunc() {
2593+
ifsnapState.inProgress {
2594+
s:=<-snapState.ch
2595+
snapshotDone(s)
2596+
}
2597+
}()
2598+
25492599
// We will establish a restoreDoneCh no matter what. Will never be triggered unless
25502600
// we replace with the restore chan.
25512601
restoreDoneCh:=make(<-chanerror)
@@ -2617,6 +2667,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
26172667

26182668
for {
26192669
select {
2670+
cases:=<-snapState.ch:
2671+
snapshotDone(s)
26202672
case<-s.quitCh:
26212673
// Server shutting down, but we might receive this before qch, so try to snapshot.
26222674
doSnapshot()
@@ -2726,7 +2778,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
27262778
// Check about snapshotting
27272779
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
27282780
ifne>=compactNumMin||nb>compactSizeMin||mset.getCLFS()>pclfs {
2729-
doSnapshot()
2781+
doSnapshotAsync()
27302782
}
27312783

27322784
caseisLeader=<-lch:
@@ -2822,7 +2874,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
28222874
stopDirectMonitoring()
28232875

28242876
case<-t.C:
2825-
doSnapshot()
2877+
doSnapshotAsync()
28262878

28272879
case<-uch:
28282880
// keep stream assignment current

‎server/raft.go‎

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type RaftNode interface {
4242
ProposeMulti(entries []*Entry)error
4343
ForwardProposal(entry []byte)error
4444
InstallSnapshot(snap []byte)error
45+
InstallSnapshotAsync(snap []byte,chchan<-InstalledSnapshot)
4546
SendSnapshot(snap []byte)error
4647
NeedSnapshot()bool
4748
Applied(indexuint64) (entriesuint64,bytesuint64)
@@ -230,6 +231,7 @@ type raft struct {
230231
observerbool// The node is observing, i.e. not able to become leader
231232
initializingbool// The node is new, and "empty log" checks can be temporarily relaxed.
232233
scaleUpbool// The node is part of a scale up, puts us in observer mode until the log contains data.
234+
snapshottingbool// Snapshot is in progress
233235
}
234236

235237
typeproposedEntrystruct {
@@ -307,6 +309,7 @@ var (
307309
errNodeClosed=errors.New("raft: node is closed")
308310
errBadSnapName=errors.New("raft: snapshot name could not be parsed")
309311
errNoSnapAvailable=errors.New("raft: no snapshot available")
312+
errSnapInProgress=errors.New("raft: snapshot is already in progress")
310313
errCatchupsRunning=errors.New("raft: snapshot can not be installed while catchups running")
311314
errSnapshotCorrupt=errors.New("raft: snapshot corrupt")
312315
errTooManyPrefs=errors.New("raft: stepdown requires at most one preferred new leader")
@@ -1242,49 +1245,106 @@ func (n *raft) SendSnapshot(data []byte) error {
12421245
returnnil
12431246
}
12441247

1245-
// Used to install a snapshot for the given term and applied index. This will release
1246-
// all of the log entries up to and including index. This should not be called with
1247-
// entries that have been applied to the FSM but have not been applied to the raft state.
1248-
func (n*raft)InstallSnapshot(data []byte)error {
1248+
typeInstalledSnapshotstruct {
1249+
Termuint64
1250+
Indexuint64
1251+
Pathstring
1252+
Errerror
1253+
}
1254+
1255+
func (n*raft)installSnapshotAsync(encoded []byte,snapInstalledSnapshot,
1256+
chchan<-InstalledSnapshot) {
1257+
gofunc() {
1258+
snap.Err=writeFileWithSync(snap.Path,encoded,defaultFilePerms)
1259+
n.Lock()
1260+
ifn.State()==Closed {
1261+
snap.Err=errNodeClosed
1262+
}
1263+
ifsnap.Err==nil {
1264+
// Delete our previous snapshot file if it exists.
1265+
ifn.snapfile!=_EMPTY_&&n.snapfile!=snap.Path {
1266+
os.Remove(n.snapfile)
1267+
}
1268+
// Remember our latest snapshot file.
1269+
n.snapfile=snap.Path
1270+
_,snap.Err=n.wal.Compact(snap.Index+1)
1271+
ifsnap.Err!=nil {
1272+
n.setWriteErrLocked(snap.Err)
1273+
}else {
1274+
varstateStreamState
1275+
n.wal.FastState(&state)
1276+
n.papplied=snap.Index
1277+
n.bytes=state.Bytes
1278+
}
1279+
}
1280+
n.snapshotting=false
1281+
n.Unlock()
1282+
ch<-snap
1283+
}()
1284+
}
1285+
1286+
// InstallSnapshotAsync installs a snapshot asynchronously. It writes the
1287+
// snapshot to disk and compacts the WAL in a separate goroutine. The caller
1288+
// is notified of the result on the provided channel.
1289+
func (n*raft)InstallSnapshotAsync(data []byte,chchan<-InstalledSnapshot) {
12491290
ifn.State()==Closed {
1250-
returnerrNodeClosed
1291+
ch<-InstalledSnapshot{Err:errNodeClosed}
1292+
return
12511293
}
12521294

12531295
n.Lock()
12541296
defern.Unlock()
12551297

1298+
ifn.snapshotting {
1299+
ch<-InstalledSnapshot{Err:errSnapInProgress}
1300+
return
1301+
}
1302+
12561303
// If a write error has occurred already then stop here.
1257-
ifwerr:=n.werr;werr!=nil {
1258-
returnwerr
1304+
ifn.werr!=nil {
1305+
ch<-InstalledSnapshot{Err:n.werr}
1306+
return
12591307
}
12601308

1261-
// Check that a catchup isn't already taking place. If it is then we won't
1262-
// allow installing snapshots until it is done.
1309+
// Check that a catchup isn't already taking place. If it is then we
1310+
//won'tallow installing snapshots until it is done.
12631311
iflen(n.progress)>0||n.paused {
1264-
returnerrCatchupsRunning
1312+
ch<-InstalledSnapshot{Err:errCatchupsRunning}
1313+
return
12651314
}
12661315

12671316
ifn.applied==0 {
1268-
n.debug("Not snapshotting as there are no applied entries")
1269-
returnerrNoSnapAvailable
1317+
ch<-InstalledSnapshot{Err:errNoSnapAvailable}
1318+
return
12701319
}
12711320

1272-
vartermuint64
1273-
ifae,_:=n.loadEntry(n.applied);ae!=nil {
1274-
term=ae.term
1275-
}else {
1276-
n.debug("Not snapshotting as entry %d is not available",n.applied)
1277-
returnerrNoSnapAvailable
1321+
ae,_:=n.loadEntry(n.applied)
1322+
ifae==nil {
1323+
ch<-InstalledSnapshot{Err:errNoSnapAvailable}
1324+
return
12781325
}
12791326

1280-
n.debug("Installing snapshot of %d bytes [%d:%d]",len(data),term,n.applied)
1281-
1282-
returnn.installSnapshot(&snapshot{
1283-
lastTerm:term,
1327+
encoded:=n.encodeSnapshot(&snapshot{
1328+
lastTerm:ae.term,
12841329
lastIndex:n.applied,
12851330
peerstate:encodePeerState(&peerState{n.peerNames(),n.csz,n.extSt}),
12861331
data:data,
12871332
})
1333+
1334+
snapDir:=filepath.Join(n.sd,snapshotsDir)
1335+
snapFile:=filepath.Join(snapDir,fmt.Sprintf(snapFileT,ae.term,n.applied))
1336+
snap:=InstalledSnapshot{Term:ae.term,Index:n.applied,Path:snapFile}
1337+
n.installSnapshotAsync(encoded,snap,ch)
1338+
}
1339+
1340+
// InstallSnapshot installs a snapshot for the current applied index. This is a
1341+
// synchronous call that will block until the snapshot is installed, and will
1342+
// release all of the log entries up to the applied index.
1343+
func (n*raft)InstallSnapshot(data []byte)error {
1344+
ch:=make(chanInstalledSnapshot,1)
1345+
n.InstallSnapshotAsync(data,ch)
1346+
snap:=<-ch
1347+
returnsnap.Err
12881348
}
12891349

12901350
// Install the snapshot.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp