Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit93d6ffc

Browse files
authored
fix: do not rename files on mmap failure (#25356)
If NewTSMReader() fails because mmap fails, do notrename the file, because the error is probablycaused by vm.max_map_count being too lowCloses:#25351(cherry picked from commit5aff511)
1 parent07df053 commit93d6ffc

File tree

5 files changed

+209
-24
lines changed

5 files changed

+209
-24
lines changed

‎tsdb/engine/tsm1/engine.go‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
172172
wal.syncDelay=time.Duration(opt.Config.WALFsyncDelay)
173173
}
174174

175-
fs:=NewFileStore(path,etags)
175+
fs:=NewFileStore(path,etags,WithMadviseWillNeed(opt.Config.TSMWillNeed))
176176
fs.openLimiter=opt.OpenLimiter
177177
ifopt.FileStoreObserver!=nil {
178178
fs.WithObserver(opt.FileStoreObserver)
179179
}
180-
fs.tsmMMAPWillNeed=opt.Config.TSMWillNeed
181180

182181
cache:=NewCache(uint64(opt.Config.CacheMaxMemorySize),etags)
183182

‎tsdb/engine/tsm1/file_store.go‎

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,8 @@ type FileStore struct {
176176
currentGenerationint
177177
dirstring
178178

179-
files []TSMFile
180-
tsmMMAPWillNeedbool// If true then the kernel will be advised MMAP_WILLNEED for TSM files.
181-
openLimiter limiter.Fixed// limit the number of concurrent opening TSM files.
179+
files []TSMFile
180+
openLimiter limiter.Fixed// limit the number of concurrent opening TSM files.
182181

183182
logger*zap.Logger// Logger to be used for important messages
184183
traceLogger*zap.Logger// Logger to be used when trace-logging is on.
@@ -198,6 +197,8 @@ type FileStore struct {
198197
// newReaderBlockCount keeps track of the current new reader block requests.
199198
// If non-zero, no new TSMReader objects may be created.
200199
newReaderBlockCountint
200+
201+
readerOptions []tsmReaderOption
201202
}
202203

203204
// FileStat holds information about a TSM file on disk.
@@ -234,7 +235,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
234235
}
235236

236237
// NewFileStore returns a new instance of FileStore based on the given directory.
237-
funcNewFileStore(dirstring,tags tsdb.EngineTags)*FileStore {
238+
funcNewFileStore(dirstring,tags tsdb.EngineTags,options...tsmReaderOption)*FileStore {
238239
logger:=zap.NewNop()
239240
fs:=&FileStore{
240241
dir:dir,
@@ -250,6 +251,7 @@ func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
250251
obs:noFileStoreObserver{},
251252
parseFileName:DefaultParseFileName,
252253
copyFiles:runtime.GOOS=="windows",
254+
readerOptions:options,
253255
}
254256
fs.purger.fileStore=fs
255257
returnfs
@@ -616,28 +618,37 @@ func (f *FileStore) Open(ctx context.Context) error {
616618
deferf.openLimiter.Release()
617619

618620
start:=time.Now()
619-
df,err:=NewTSMReader(file,WithMadviseWillNeed(f.tsmMMAPWillNeed))
621+
df,err:=NewTSMReader(file,f.readerOptions...)
620622
f.logger.Info("Opened file",
621623
zap.String("path",file.Name()),
622624
zap.Int("id",idx),
623625
zap.Duration("duration",time.Since(start)))
624626

625-
// If we are unable to read a TSM file then log the error, rename
626-
// the file, and continue loading the shard without it.
627+
// If we are unable to read a TSM file then log the error.
627628
iferr!=nil {
628629
ifcerr:=file.Close();cerr!=nil {
629630
f.logger.Error("Error closing TSM file after error",zap.String("path",file.Name()),zap.Int("id",idx),zap.Error(cerr))
630631
}
631-
// If the file is corrupt, rename it and
632-
// continue loading the shard without it.
633-
f.logger.Error("Cannot read corrupt tsm file, renaming",zap.String("path",file.Name()),zap.Int("id",idx),zap.Error(err))
634-
ife:=os.Rename(file.Name(),file.Name()+"."+BadTSMFileExtension);e!=nil {
635-
f.logger.Error("Cannot rename corrupt tsm file",zap.String("path",file.Name()),zap.Int("id",idx),zap.Error(e))
636-
readerC<-&res{r:df,err:fmt.Errorf("cannot rename corrupt file %s: %w",file.Name(),e)}
632+
iferrors.Is(err,MmapError{}) {
633+
// An MmapError may indicate we have insufficient
634+
// handles for the mmap call, in which case the file should
635+
// be left untouched, and the vm.max_map_count be raised.
636+
f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low",
637+
zap.String("path",file.Name()),zap.Int("id",idx),zap.Error(err))
638+
readerC<-&res{r:df,err:fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v",file.Name(),err)}
639+
return
640+
}else {
641+
// If the file is corrupt, rename it and
642+
// continue loading the shard without it.
643+
f.logger.Error("Cannot read corrupt tsm file, renaming",zap.String("path",file.Name()),zap.Int("id",idx),zap.Error(err))
644+
ife:=os.Rename(file.Name(),file.Name()+"."+BadTSMFileExtension);e!=nil {
645+
f.logger.Error("Cannot rename corrupt tsm file",zap.String("path",file.Name()),zap.Int("id",idx),zap.Error(e))
646+
readerC<-&res{r:df,err:fmt.Errorf("cannot rename corrupt file %s: %v",file.Name(),e)}
647+
return
648+
}
649+
readerC<-&res{r:df,err:fmt.Errorf("cannot read corrupt file %s: %v",file.Name(),err)}
637650
return
638651
}
639-
readerC<-&res{r:df,err:fmt.Errorf("cannot read corrupt file %s: %w",file.Name(),err)}
640-
return
641652
}
642653
df.WithObserver(f.obs)
643654
readerC<-&res{r:df}
@@ -920,7 +931,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
920931
}
921932
}
922933

923-
tsm,err:=NewTSMReader(fd,WithMadviseWillNeed(f.tsmMMAPWillNeed))
934+
tsm,err:=NewTSMReader(fd,f.readerOptions...)
924935
iferr!=nil {
925936
ifnewName!=oldName {
926937
iferr1:=os.Rename(newName,oldName);err1!=nil {
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package tsm1
2+
3+
import (
4+
"github.com/influxdata/influxdb/v2/tsdb"
5+
)
6+
7+
varTestMmapInitFailOption=func(errerror)tsmReaderOption {
8+
returnfunc(r*TSMReader) {
9+
r.accessor=&badBlockAccessor{error:err}
10+
}
11+
}
12+
13+
typebadBlockAccessorstruct {
14+
error
15+
initCalledbool
16+
}
17+
18+
func (b*badBlockAccessor)init() (*indirectIndex,error) {
19+
b.initCalled=true
20+
returnnil,b.error
21+
}
22+
23+
func (b*badBlockAccessor)read(key []byte,timestampint64) ([]Value,error) {
24+
//TODO implement me
25+
panic("implement me")
26+
}
27+
28+
func (b*badBlockAccessor)readAll(key []byte) ([]Value,error) {
29+
//TODO implement me
30+
panic("implement me")
31+
}
32+
33+
func (b*badBlockAccessor)readBlock(entry*IndexEntry,values []Value) ([]Value,error) {
34+
//TODO implement me
35+
panic("implement me")
36+
}
37+
38+
func (b*badBlockAccessor)readFloatBlock(entry*IndexEntry,values*[]FloatValue) ([]FloatValue,error) {
39+
//TODO implement me
40+
panic("implement me")
41+
}
42+
43+
func (b*badBlockAccessor)readFloatArrayBlock(entry*IndexEntry,values*tsdb.FloatArray)error {
44+
//TODO implement me
45+
panic("implement me")
46+
}
47+
48+
func (b*badBlockAccessor)readIntegerBlock(entry*IndexEntry,values*[]IntegerValue) ([]IntegerValue,error) {
49+
//TODO implement me
50+
panic("implement me")
51+
}
52+
53+
func (b*badBlockAccessor)readIntegerArrayBlock(entry*IndexEntry,values*tsdb.IntegerArray)error {
54+
//TODO implement me
55+
panic("implement me")
56+
}
57+
58+
func (b*badBlockAccessor)readUnsignedBlock(entry*IndexEntry,values*[]UnsignedValue) ([]UnsignedValue,error) {
59+
//TODO implement me
60+
panic("implement me")
61+
}
62+
63+
func (b*badBlockAccessor)readUnsignedArrayBlock(entry*IndexEntry,values*tsdb.UnsignedArray)error {
64+
//TODO implement me
65+
panic("implement me")
66+
}
67+
68+
func (b*badBlockAccessor)readStringBlock(entry*IndexEntry,values*[]StringValue) ([]StringValue,error) {
69+
//TODO implement me
70+
panic("implement me")
71+
}
72+
73+
func (b*badBlockAccessor)readStringArrayBlock(entry*IndexEntry,values*tsdb.StringArray)error {
74+
//TODO implement me
75+
panic("implement me")
76+
}
77+
78+
func (b*badBlockAccessor)readBooleanBlock(entry*IndexEntry,values*[]BooleanValue) ([]BooleanValue,error) {
79+
//TODO implement me
80+
panic("implement me")
81+
}
82+
83+
func (b*badBlockAccessor)readBooleanArrayBlock(entry*IndexEntry,values*tsdb.BooleanArray)error {
84+
//TODO implement me
85+
panic("implement me")
86+
}
87+
88+
func (b*badBlockAccessor)readBytes(entry*IndexEntry,buf []byte) (uint32, []byte,error) {
89+
//TODO implement me
90+
panic("implement me")
91+
}
92+
93+
func (b*badBlockAccessor)rename(pathstring)error {
94+
//TODO implement me
95+
panic("implement me")
96+
}
97+
98+
func (b*badBlockAccessor)path()string {
99+
//TODO implement me
100+
panic("implement me")
101+
}
102+
103+
func (b*badBlockAccessor)close()error {
104+
if!b.initCalled {
105+
panic("close called without an init call")
106+
}
107+
b.initCalled=false
108+
returnnil
109+
}
110+
111+
func (b*badBlockAccessor)free()error {
112+
//TODO implement me
113+
panic("implement me")
114+
}

‎tsdb/engine/tsm1/file_store_test.go‎

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/influxdata/influxdb/v2/tsdb"
1515
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
16+
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
1718
"go.uber.org/zap/zaptest"
1819
)
@@ -2412,6 +2413,42 @@ func TestFileStore_Open(t *testing.T) {
24122413
}
24132414
}
24142415

2416+
funcTestFileStore_OpenFail(t*testing.T) {
2417+
varerrerror
2418+
dir:=t.TempDir()
2419+
2420+
// Create a TSM file...
2421+
data:=keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0,1.0)}}
2422+
2423+
files,err:=newFileDir(t,dir,data)
2424+
iferr!=nil {
2425+
fatal(t,"creating test files",err)
2426+
}
2427+
assert.Equal(t,1,len(files))
2428+
f:=files[0]
2429+
2430+
constmmapErrMsg="mmap failure in test"
2431+
constfullMmapErrMsg="system limit for vm.max_map_count may be too low: "+mmapErrMsg
2432+
// With an mmap failure, the files should all be left where they are, because they are not corrupt
2433+
openFail(t,dir,fullMmapErrMsg,tsm1.NewMmapError(fmt.Errorf(mmapErrMsg)))
2434+
assert.FileExistsf(t,f,"file not found, but should not have been moved for mmap failure")
2435+
2436+
// With a non-mmap failure, the file failing to open should be moved aside
2437+
constotherErrMsg="some Random Init Failure"
2438+
openFail(t,dir,otherErrMsg,fmt.Errorf(otherErrMsg))
2439+
assert.NoFileExistsf(t,f,"file found, but should have been moved for open failure")
2440+
assert.FileExistsf(t,f+"."+tsm1.BadTSMFileExtension,"file not found, but should have been moved here for open failure")
2441+
}
2442+
2443+
funcopenFail(t*testing.T,dirstring,fullErrMsgstring,initErrerror) {
2444+
fs:=tsm1.NewFileStore(dir, tsdb.EngineTags{},tsm1.TestMmapInitFailOption(initErr))
2445+
err:=fs.Open(context.Background())
2446+
assert.Error(t,err)
2447+
assert.Contains(t,err.Error(),fullErrMsg)
2448+
deferfunc() {assert.NoError(t,fs.Close(),"unexpected error on FileStore.Close") }()
2449+
assert.Equal(t,0,fs.Count(),"file count mismatch")
2450+
}
2451+
24152452
funcTestFileStore_Remove(t*testing.T) {
24162453
dir:=t.TempDir()
24172454

‎tsdb/engine/tsm1/reader.go‎

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tsm1
33
import (
44
"bytes"
55
"encoding/binary"
6+
"errors"
67
"fmt"
78
"io"
89
"math"
@@ -218,6 +219,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
218219
}
219220
}
220221

222+
// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure
221223
// NewTSMReader returns a new TSMReader from the given file.
222224
funcNewTSMReader(f*os.File,options...tsmReaderOption) (*TSMReader,error) {
223225
t:=&TSMReader{}
@@ -231,15 +233,17 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
231233
}
232234
t.size=stat.Size()
233235
t.lastModified=stat.ModTime().UnixNano()
234-
t.accessor=&mmapAccessor{
235-
f:f,
236-
mmapWillNeed:t.madviseWillNeed,
236+
ift.accessor==nil {
237+
t.accessor=&mmapAccessor{
238+
f:f,
239+
mmapWillNeed:t.madviseWillNeed,
240+
}
237241
}
238242

239243
index,err:=t.accessor.init()
240244
iferr!=nil {
241-
_=t.accessor.close()
242-
returnnil,err
245+
cerr:=t.accessor.close()
246+
returnnil,errors.Join(err,cerr)
243247
}
244248

245249
t.index=index
@@ -1314,6 +1318,24 @@ type mmapAccessor struct {
13141318
index*indirectIndex
13151319
}
13161320

1321+
typeMmapErrorstruct {
1322+
error
1323+
}
1324+
1325+
func (m*MmapError)Unwrap()error {
1326+
returnm.error
1327+
}
1328+
1329+
func (mMmapError)Is(eerror)bool {
1330+
_,oks:=e.(MmapError)
1331+
_,okp:=e.(*MmapError)
1332+
returnoks||okp
1333+
}
1334+
1335+
funcNewMmapError(eerror)MmapError {
1336+
returnMmapError{error:e}
1337+
}
1338+
13171339
func (m*mmapAccessor)init() (*indirectIndex,error) {
13181340
m.mu.Lock()
13191341
deferm.mu.Unlock()
@@ -1335,7 +1357,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {
13351357

13361358
m.b,err=mmap(m.f,0,int(stat.Size()))
13371359
iferr!=nil {
1338-
returnnil,err
1360+
// Wrap the error to let callers know this was an error
1361+
// from mmap, and may indicate vm.max_map_count is too low
1362+
returnnil,NewMmapError(err)
13391363
}
13401364
iflen(m.b)<8 {
13411365
returnnil,fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp