Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite2a85d9

Browse files
committed
Merge pull requestsorintlab#312 from sgotti/detect_standby_cannot_sync_due_to_missing_primary_wals
*: detect if standby can't sync due to missing wals
2 parents6f58315 +e43f536 commite2a85d9

File tree

10 files changed

+394
-25
lines changed

10 files changed

+394
-25
lines changed

‎cmd/keeper/keeper.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,8 @@ func (p *PostgresKeeper) GetPGState(pctx context.Context) (*cluster.PostgresStat
564564
// if timeline <= 1 then no timeline history file exists.
565565
pgState.TimelinesHistory= cluster.PostgresTimelinesHistory{}
566566
ifpgState.TimelineID>1 {
567-
tlsh,err:=p.pgm.GetTimelinesHistory(pgState.TimelineID)
567+
vartlsh []*postgresql.TimelineHistory
568+
tlsh,err=p.pgm.GetTimelinesHistory(pgState.TimelineID)
568569
iferr!=nil {
569570
log.Errorw("error getting timeline history",zap.Error(err))
570571
returnpgState,nil
@@ -581,6 +582,14 @@ func (p *PostgresKeeper) GetPGState(pctx context.Context) (*cluster.PostgresStat
581582
}
582583
pgState.TimelinesHistory=ctlsh
583584
}
585+
586+
ow,err:=p.pgm.OlderWalFile()
587+
iferr!=nil {
588+
log.Warnw("error getting older wal file",zap.Error(err))
589+
}else {
590+
log.Debugw("older wal file","filename",ow)
591+
pgState.OlderWalFile=ow
592+
}
584593
pgState.Healthy=true
585594
}
586595

‎cmd/sentinel/sentinel.go

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/sorintlab/stolon/pkg/cluster"
3434
"github.com/sorintlab/stolon/pkg/flagutil"
3535
slog"github.com/sorintlab/stolon/pkg/log"
36+
"github.com/sorintlab/stolon/pkg/postgresql"
3637
"github.com/sorintlab/stolon/pkg/store"
3738
"github.com/sorintlab/stolon/pkg/timer"
3839
"github.com/sorintlab/stolon/pkg/util"
@@ -184,6 +185,20 @@ func (s *Sentinel) CleanDBError(uid string) {
184185
}
185186
}
186187

188+
func (s*Sentinel)SetDBNotIncreasingXLogPos(uidstring) {
189+
if_,ok:=s.dbNotIncreasingXLogPos[uid];!ok {
190+
s.dbNotIncreasingXLogPos[uid]=1
191+
}else {
192+
s.dbNotIncreasingXLogPos[uid]=s.dbNotIncreasingXLogPos[uid]+1
193+
}
194+
}
195+
196+
func (s*Sentinel)CleanDBNotIncreasingXLogPos(uidstring) {
197+
if_,ok:=s.dbNotIncreasingXLogPos[uid];ok {
198+
delete(s.dbNotIncreasingXLogPos,uid)
199+
}
200+
}
201+
187202
func (s*Sentinel)updateKeepersStatus(cd*cluster.ClusterData,keepersInfo cluster.KeepersInfo,firstRunbool) (*cluster.ClusterData,KeeperInfoHistories) {
188203
// Create a copy of cd
189204
cd=cd.DeepCopy()
@@ -285,6 +300,13 @@ func (s *Sentinel) updateKeepersStatus(cd *cluster.ClusterData, keepersInfo clus
285300
continue
286301
}
287302
log.Debugw("received db state","db",db.UID)
303+
304+
ifdb.Status.XLogPos==dbs.XLogPos {
305+
s.SetDBNotIncreasingXLogPos(db.UID)
306+
}else {
307+
s.CleanDBNotIncreasingXLogPos(db.UID)
308+
}
309+
288310
db.Status.ListenAddress=dbs.ListenAddress
289311
db.Status.Port=dbs.Port
290312
db.Status.CurrentGeneration=dbs.Generation
@@ -295,12 +317,16 @@ func (s *Sentinel) updateKeepersStatus(cd *cluster.ClusterData, keepersInfo clus
295317
db.Status.XLogPos=dbs.XLogPos
296318
db.Status.TimelinesHistory=dbs.TimelinesHistory
297319
db.Status.PGParameters=cluster.PGParameters(dbs.PGParameters)
320+
298321
// Sort synchronousStandbys so we can compare the slice regardless of its order
299322
sort.Sort(sort.StringSlice(dbs.SynchronousStandbys))
300323
db.Status.SynchronousStandbys=dbs.SynchronousStandbys
324+
325+
db.Status.OlderWalFile=dbs.OlderWalFile
301326
}else {
302327
s.SetDBError(db.UID)
303328
}
329+
304330
}
305331

306332
// Update dbs' healthy state
@@ -487,6 +513,65 @@ func (s *Sentinel) dbValidity(cd *cluster.ClusterData, dbUID string) dbValidity
487513
returndbValidityValid
488514
}
489515

516+
func (s*Sentinel)dbCanSync(cd*cluster.ClusterData,dbUIDstring)bool {
517+
db,ok:=cd.DBs[dbUID]
518+
if!ok {
519+
panic(fmt.Errorf("requested unexisting db uid %q",dbUID))
520+
}
521+
masterDB:=cd.DBs[cd.Cluster.Status.Master]
522+
523+
// ignore if master doesn't provide the older wal file
524+
ifmasterDB.Status.OlderWalFile=="" {
525+
returntrue
526+
}
527+
528+
// skip current master
529+
ifdbUID==masterDB.UID {
530+
returntrue
531+
}
532+
533+
// skip the standbys
534+
ifs.dbType(cd,db.UID)!=dbTypeStandby {
535+
returntrue
536+
}
537+
538+
// only check when db isn't initializing
539+
ifdb.Generation==cluster.InitialGeneration {
540+
returntrue
541+
}
542+
543+
// check only if the db isn't healty.
544+
if!db.Status.Healthy {
545+
returntrue
546+
}
547+
548+
ifdb.Status.XLogPos==masterDB.Status.XLogPos {
549+
returntrue
550+
}
551+
552+
// check only if the xlogpos isn't increasing for some time. This can also
553+
// happen when no writes are happening on the master but the standby should
554+
// be, if syncing at the same xlogpos.
555+
ifs.isDBIncreasingXLogPos(cd,db) {
556+
returntrue
557+
}
558+
559+
required:=postgresql.XlogPosToWalFileNameNoTimeline(db.Status.XLogPos)
560+
older,err:=postgresql.WalFileNameNoTimeLine(masterDB.Status.OlderWalFile)
561+
iferr!=nil {
562+
// warn on wrong file name (shouldn't happen...)
563+
log.Warnw("wrong wal file name","filename",masterDB.Status.OlderWalFile)
564+
}
565+
log.Debugw("xlog pos isn't advancing on standby, checking if the master has the required wals","db",db.UID,"keeper",db.Spec.KeeperUID,"requiredWAL",required,"olderMasterWAL",older)
566+
// compare the required wal file with the older wal file name ignoring the timelineID
567+
ifrequired>=older {
568+
returntrue
569+
}
570+
571+
log.Infow("db won't be able to sync due to missing required wals on master","db",db.UID,"keeper",db.Spec.KeeperUID,"requiredWAL",required,"olderMasterWAL",older)
572+
returnfalse
573+
}
574+
490575
func (s*Sentinel)dbStatus(cd*cluster.ClusterData,dbUIDstring)dbStatus {
491576
db,ok:=cd.DBs[dbUID]
492577
if!ok {
@@ -1014,7 +1099,7 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf
10141099
ifs.dbType(newcd,db.UID)!=dbTypeMaster {
10151100
continue
10161101
}
1017-
log.Infow("removing old master db","db",db.UID)
1102+
log.Infow("removing old master db","db",db.UID,"keeper",db.Spec.KeeperUID)
10181103
toRemove=append(toRemove,db)
10191104
}
10201105
for_,db:=rangetoRemove {
@@ -1030,7 +1115,20 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf
10301115
ifs.dbValidity(newcd,db.UID)!=dbValidityInvalid {
10311116
continue
10321117
}
1033-
log.Infow("removing invalid db","db",db.UID)
1118+
log.Infow("removing invalid db","db",db.UID,"keeper",db.Spec.KeeperUID)
1119+
toRemove=append(toRemove,db)
1120+
}
1121+
for_,db:=rangetoRemove {
1122+
delete(newcd.DBs,db.UID)
1123+
}
1124+
1125+
// Remove dbs that won't sync due to missing wals on current master
1126+
toRemove= []*cluster.DB{}
1127+
for_,db:=rangenewcd.DBs {
1128+
ifs.dbCanSync(cd,db.UID) {
1129+
continue
1130+
}
1131+
log.Infow("removing db that won't be able to sync due to missing wals on current master","db",db.UID,"keeper",db.Spec.KeeperUID)
10341132
toRemove=append(toRemove,db)
10351133
}
10361134
for_,db:=rangetoRemove {
@@ -1279,6 +1377,17 @@ func (s *Sentinel) isDBHealthy(cd *cluster.ClusterData, db *cluster.DB) bool {
12791377
returntrue
12801378
}
12811379

1380+
func (s*Sentinel)isDBIncreasingXLogPos(cd*cluster.ClusterData,db*cluster.DB)bool {
1381+
t,ok:=s.dbNotIncreasingXLogPos[db.UID]
1382+
if!ok {
1383+
returntrue
1384+
}
1385+
ift>cluster.DefaultDBNotIncreasingXLogPosTimes {
1386+
returnfalse
1387+
}
1388+
returntrue
1389+
}
1390+
12821391
func (s*Sentinel)updateDBConvergenceInfos(cd*cluster.ClusterData) {
12831392
for_,db:=rangecd.DBs {
12841393
ifdb.Status.CurrentGeneration==db.Generation {
@@ -1365,9 +1474,10 @@ type Sentinel struct {
13651474
// Make RandFn settable to ease testing with reproducible "random" numbers
13661475
RandFnfunc(int)int
13671476

1368-
keeperErrorTimersmap[string]int64
1369-
dbErrorTimersmap[string]int64
1370-
dbConvergenceInfosmap[string]*DBConvergenceInfo
1477+
keeperErrorTimersmap[string]int64
1478+
dbErrorTimersmap[string]int64
1479+
dbNotIncreasingXLogPosmap[string]int64
1480+
dbConvergenceInfosmap[string]*DBConvergenceInfo
13711481

13721482
keeperInfoHistoriesKeeperInfoHistories
13731483
}
@@ -1539,6 +1649,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
15391649
iffirstRun {
15401650
s.keeperErrorTimers=make(map[string]int64)
15411651
s.dbErrorTimers=make(map[string]int64)
1652+
s.dbNotIncreasingXLogPos=make(map[string]int64)
15421653
s.keeperInfoHistories=make(KeeperInfoHistories)
15431654
s.dbConvergenceInfos=make(map[string]*DBConvergenceInfo)
15441655

‎pkg/cluster/cluster.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ const (
4747
DefaultProxyCheckInterval=5*time.Second
4848
DefaultProxyTimeoutInterval=15*time.Second
4949

50+
DefaultDBNotIncreasingXLogPosTimes=10
51+
5052
DefaultSleepInterval=5*time.Second
5153
DefaultRequestTimeout=10*time.Second
5254
DefaultConvergenceTimeout=30*time.Second
@@ -537,6 +539,7 @@ type DBStatus struct {
537539

538540
PGParametersPGParameters`json:"pgParameters,omitempty"`
539541
SynchronousStandbys []string`json:"synchronousStandbys"`
542+
OlderWalFilestring`json:"olderWalFile,omitempty"`
540543
}
541544

542545
typeDBstruct {

‎pkg/cluster/member.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,16 @@ type PostgresState struct {
8888
ListenAddressstring`json:"listenAddress,omitempty"`
8989
Portstring`json:"port,omitempty"`
9090

91-
Healthybool`json:"healthy,omitempty"`
92-
SystemIDstring`json:"systemID,omitempty"`
93-
TimelineIDuint64`json:"timelineID,omitempty"`
94-
XLogPosuint64`json:"xLogPos,omitempty"`
95-
TimelinesHistoryPostgresTimelinesHistory`json:"timelinesHistory,omitempty"`
96-
PGParameters common.Parameters`json:"pgParameters,omitempty"`
97-
SynchronousStandbys []string`json:"synchronousStandbys"`
91+
Healthybool`json:"healthy,omitempty"`
92+
93+
SystemIDstring`json:"systemID,omitempty"`
94+
TimelineIDuint64`json:"timelineID,omitempty"`
95+
XLogPosuint64`json:"xLogPos,omitempty"`
96+
TimelinesHistoryPostgresTimelinesHistory`json:"timelinesHistory,omitempty"`
97+
98+
PGParameters common.Parameters`json:"pgParameters,omitempty"`
99+
SynchronousStandbys []string`json:"synchronousStandbys"`
100+
OlderWalFilestring`json:"olderWalFile,omitempty"`
98101
}
99102

100103
func (p*PostgresState)DeepCopy()*PostgresState {

‎pkg/postgresql/postgresql.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"os/exec"
2323
"path/filepath"
2424
"regexp"
25+
"sort"
2526
"strings"
2627
"syscall"
2728
"time"
@@ -707,3 +708,44 @@ func (p *Manager) Ping() error {
707708
defercancel()
708709
returnping(ctx,p.localConnParams)
709710
}
711+
712+
func (p*Manager)OlderWalFile() (string,error) {
713+
maj,_,err:=p.PGDataVersion()
714+
iferr!=nil {
715+
return"",err
716+
}
717+
varwalDirstring
718+
ifmaj<10 {
719+
walDir="pg_xlog"
720+
}else {
721+
walDir="pg_wal"
722+
}
723+
724+
f,err:=os.Open(filepath.Join(p.dataDir,walDir))
725+
iferr!=nil {
726+
return"",err
727+
}
728+
names,err:=f.Readdirnames(-1)
729+
f.Close()
730+
iferr!=nil {
731+
return"",err
732+
}
733+
sort.Strings(names)
734+
735+
for_,name:=rangenames {
736+
ifIsWalFileName(name) {
737+
fi,err:=os.Stat(filepath.Join(p.dataDir,walDir,name))
738+
iferr!=nil {
739+
return"",err
740+
}
741+
// if the file size is different from the currently supported one
742+
// (16Mib) return without checking other possible wal files
743+
iffi.Size()!=WalSegSize {
744+
return"",fmt.Errorf("wal file has unsupported size: %d",fi.Size())
745+
}
746+
returnname,nil
747+
}
748+
}
749+
750+
return"",nil
751+
}

‎pkg/postgresql/utils.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,15 @@ import (
2424

2525
"github.com/sorintlab/stolon/common"
2626

27+
"os"
28+
2729
_"github.com/lib/pq"
2830
"golang.org/x/net/context"
29-
"os"
31+
)
32+
33+
const (
34+
// TODO(sgotti) for now we assume wal size is the default 16MiB size
35+
WalSegSize= (16*1024*1024)// 16MiB
3036
)
3137

3238
var (
@@ -424,3 +430,37 @@ func ParseVersion(v string) (int, int, error) {
424430

425431
returnmaj,min,nil
426432
}
433+
434+
funcIsWalFileName(namestring)bool {
435+
walChars:="0123456789ABCDEF"
436+
iflen(name)!=24 {
437+
returnfalse
438+
}
439+
for_,c:=rangename {
440+
ok:=false
441+
for_,v:=rangewalChars {
442+
ifc==v {
443+
ok=true
444+
}
445+
}
446+
if!ok {
447+
returnfalse
448+
}
449+
}
450+
returntrue
451+
}
452+
453+
funcXlogPosToWalFileNameNoTimeline(XLogPosuint64)string {
454+
id:=uint32(XLogPos>>32)
455+
offset:=uint32(XLogPos)
456+
// TODO(sgotti) for now we assume wal size is the default 16M size
457+
seg:=offset/WalSegSize
458+
returnfmt.Sprintf("%08X%08X",id,seg)
459+
}
460+
461+
funcWalFileNameNoTimeLine(namestring) (string,error) {
462+
if!IsWalFileName(name) {
463+
return"",fmt.Errorf("bad wal file name")
464+
}
465+
returnname[8:24],nil
466+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp