Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit710407c

Browse files
committed
Always use temporary staging for filestore meta file writes
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent02df8f4 commit710407c

File tree

2 files changed

+60
-36
lines changed

2 files changed

+60
-36
lines changed

‎server/filestore.go‎

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11764,29 +11764,47 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
1176411764
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
1176511765
// handled automatically by this function, so don't wrap calls to it in dios.
1176611766
func (fs*fileStore)writeFileWithOptionalSync(namestring,data []byte,perm fs.FileMode)error {
11767-
iffs.fcfg.SyncAlways {
11768-
returnwriteFileWithSync(name,data,perm)
11769-
}
11770-
<-dios
11771-
deferfunc() {
11772-
dios<-struct{}{}
11773-
}()
11774-
returnos.WriteFile(name,data,perm)
11767+
returnwriteAtomically(name,data,perm,fs.fcfg.SyncAlways)
1177511768
}
1177611769

1177711770
funcwriteFileWithSync(namestring,data []byte,perm fs.FileMode)error {
11771+
returnwriteAtomically(name,data,perm,true)
11772+
}
11773+
11774+
funcwriteAtomically(namestring,data []byte,perm fs.FileMode,syncbool)error {
11775+
tmp:=name+".tmp"
11776+
flags:=os.O_CREATE|os.O_WRONLY|os.O_TRUNC
11777+
ifsync {
11778+
flags=flags|os.O_SYNC
11779+
}
1177811780
<-dios
1177911781
deferfunc() {
1178011782
dios<-struct{}{}
1178111783
}()
11782-
flags:=os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC
11783-
f,err:=os.OpenFile(name,flags,perm)
11784+
f,err:=os.OpenFile(tmp,flags,perm)
1178411785
iferr!=nil {
1178511786
returnerr
1178611787
}
11787-
if_,err=f.Write(data);err!=nil {
11788+
if_,err:=f.Write(data);err!=nil {
1178811789
_=f.Close()
11790+
_=os.Remove(tmp)
1178911791
returnerr
1179011792
}
11791-
returnf.Close()
11793+
iferr:=f.Close();err!=nil {
11794+
_=os.Remove(tmp)
11795+
returnerr
11796+
}
11797+
iferr:=os.Rename(tmp,name);err!=nil {
11798+
_=os.Remove(tmp)
11799+
returnerr
11800+
}
11801+
ifsync {
11802+
// To ensure that the file rename was persisted on all filesystems,
11803+
// also try to flush the directory metadata.
11804+
ifd,err:=os.Open(filepath.Dir(name));err==nil {
11805+
_=d.Sync()
11806+
_=d.Close()
11807+
}
11808+
}
11809+
returnnil
1179211810
}

‎server/filestore_test.go‎

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4182,6 +4182,7 @@ func TestFileStoreEncrypted(t *testing.T) {
41824182
err=o.Update(state)
41834183
require_NoError(t,err)
41844184

4185+
o.Stop()
41854186
fs.Stop()
41864187
fs,err=newFileStoreWithCreated(fcfg,StreamConfig{Name:"zzz",Storage:FileStorage},created,prf(&fcfg),nil)
41874188
require_NoError(t,err)
@@ -4982,36 +4983,41 @@ func TestFileStoreSubjectsTotals(t *testing.T) {
49824983

49834984
funcTestFileStoreConsumerStoreEncodeAfterRestart(t*testing.T) {
49844985
testFileStoreAllPermutations(t,func(t*testing.T,fcfgFileStoreConfig) {
4985-
fs,err:=newFileStoreWithCreated(fcfg,StreamConfig{Name:"zzz",Storage:FileStorage},time.Now(),prf(&fcfg),nil)
4986-
require_NoError(t,err)
4987-
deferfs.Stop()
4986+
state:=&ConsumerState{}
49884987

4989-
o,err:=fs.ConsumerStore("o22",&ConsumerConfig{AckPolicy:AckExplicit})
4990-
require_NoError(t,err)
4988+
func() {// for defers
4989+
fs,err:=newFileStoreWithCreated(fcfg,StreamConfig{Name:"zzz",Storage:FileStorage},time.Now(),prf(&fcfg),nil)
4990+
require_NoError(t,err)
4991+
deferfs.Stop()
49914992

4992-
state:=&ConsumerState{}
4993-
state.Delivered.Consumer=22
4994-
state.Delivered.Stream=22
4995-
state.AckFloor.Consumer=11
4996-
state.AckFloor.Stream=11
4997-
err=o.Update(state)
4998-
require_NoError(t,err)
4993+
o,err:=fs.ConsumerStore("o22",&ConsumerConfig{AckPolicy:AckExplicit})
4994+
require_NoError(t,err)
4995+
defero.Stop()
49994996

5000-
fs.Stop()
4997+
state.Delivered.Consumer=22
4998+
state.Delivered.Stream=22
4999+
state.AckFloor.Consumer=11
5000+
state.AckFloor.Stream=11
5001+
err=o.Update(state)
5002+
require_NoError(t,err)
5003+
}()
50015004

5002-
fs,err=newFileStoreWithCreated(fcfg,StreamConfig{Name:"zzz",Storage:FileStorage},time.Now(),prf(&fcfg),nil)
5003-
require_NoError(t,err)
5004-
deferfs.Stop()
5005+
func() {// for defers
5006+
fs,err:=newFileStoreWithCreated(fcfg,StreamConfig{Name:"zzz",Storage:FileStorage},time.Now(),prf(&fcfg),nil)
5007+
require_NoError(t,err)
5008+
deferfs.Stop()
50055009

5006-
o,err=fs.ConsumerStore("o22",&ConsumerConfig{AckPolicy:AckExplicit})
5007-
require_NoError(t,err)
5010+
o,err:=fs.ConsumerStore("o22",&ConsumerConfig{AckPolicy:AckExplicit})
5011+
require_NoError(t,err)
5012+
defero.Stop()
50085013

5009-
ifo.(*consumerFileStore).state.Delivered!=state.Delivered {
5010-
t.Fatalf("Consumer state is wrong %+v vs %+v",o.(*consumerFileStore).state,state)
5011-
}
5012-
ifo.(*consumerFileStore).state.AckFloor!=state.AckFloor {
5013-
t.Fatalf("Consumer state is wrong %+v vs %+v",o.(*consumerFileStore).state,state)
5014-
}
5014+
ifo.(*consumerFileStore).state.Delivered!=state.Delivered {
5015+
t.Fatalf("Consumer state is wrong %+v vs %+v",o.(*consumerFileStore).state,state)
5016+
}
5017+
ifo.(*consumerFileStore).state.AckFloor!=state.AckFloor {
5018+
t.Fatalf("Consumer state is wrong %+v vs %+v",o.(*consumerFileStore).state,state)
5019+
}
5020+
}()
50155021
})
50165022
}
50175023

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp