Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3db89c9

Browse files
committed
Merge pull requestsorintlab#268 from sgotti/handle_max_standby_lag
sentinel: don't choose keepers with db behind a defined lag as masters.
2 parents2397b58 +5587b86 commit3db89c9

File tree

6 files changed

+116
-3
lines changed

6 files changed

+116
-3
lines changed

‎cmd/sentinel/sentinel.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,19 @@ func (s *Sentinel) isDifferentTimelineBranch(followedDB *cluster.DB, db *cluster
350350
returnfalse
351351
}
352352

353+
// isLagBelowMax checks if the db reported lag is below MaxStandbyLag from the
354+
// master reported lag
355+
func (s*Sentinel)isLagBelowMax(cd*cluster.ClusterData,curMasterDB,db*cluster.DB)bool {
356+
if!*cd.Cluster.DefSpec().SynchronousReplication {
357+
log.Debug(fmt.Sprintf("curMasterDB.Status.XLogPos: %d, db.Status.XLogPos: %d, lag: %d",curMasterDB.Status.XLogPos,db.Status.XLogPos,int64(curMasterDB.Status.XLogPos-db.Status.XLogPos)))
358+
ifint64(curMasterDB.Status.XLogPos-db.Status.XLogPos)>int64(*cd.Cluster.DefSpec().MaxStandbyLag) {
359+
log.Debug("ignoring keeper since its behind that maximum xlog position",zap.String("db",db.UID),zap.Uint64("dbXLogPos",db.Status.XLogPos),zap.Uint64("masterXLogPos",curMasterDB.Status.XLogPos))
360+
returnfalse
361+
}
362+
}
363+
returntrue
364+
}
365+
353366
func (s*Sentinel)freeKeepers(cd*cluster.ClusterData) []*cluster.Keeper {
354367
freeKeepers:= []*cluster.Keeper{}
355368
K:
@@ -548,6 +561,15 @@ func (s *Sentinel) findBestStandbys(cd *cluster.ClusterData, masterDB *cluster.D
548561
log.Debug("ignoring keeper since its pg timeline is different than master timeline",zap.String("db",db.UID),zap.Uint64("dbTimeline",db.Status.TimelineID),zap.Uint64("masterTimeline",masterDB.Status.TimelineID))
549562
continue
550563
}
564+
// do this only when not using synchronous replication since in sync repl we
565+
// have to ignore the last reported xlogpos or valid sync standby will be
566+
// skipped
567+
if!*cd.Cluster.DefSpec().SynchronousReplication {
568+
if!s.isLagBelowMax(cd,masterDB,db) {
569+
log.Debug("ignoring keeper since its lag is above the max configured lag",zap.String("db",db.UID),zap.Uint64("dbXLogPos",db.Status.XLogPos),zap.Uint64("masterXLogPos",masterDB.Status.XLogPos))
570+
continue
571+
}
572+
}
551573
bestDBs=append(bestDBs,db)
552574
}
553575
// Sort by XLogPos
@@ -569,6 +591,15 @@ func (s *Sentinel) findBestNewMasters(cd *cluster.ClusterData, masterDB *cluster
569591
log.Debug("ignoring keeper since its pg timeline is different than master timeline",zap.String("db",db.UID),zap.Uint64("dbTimeline",db.Status.TimelineID),zap.Uint64("masterTimeline",masterDB.Status.TimelineID))
570592
continue
571593
}
594+
// do this only when not using synchronous replication since in sync repl we
595+
// have to ignore the last reported xlogpos or valid sync standby will be
596+
// skipped
597+
if!*cd.Cluster.DefSpec().SynchronousReplication {
598+
if!s.isLagBelowMax(cd,masterDB,db) {
599+
log.Debug("ignoring keeper since its lag is above the max configured lag",zap.String("db",db.UID),zap.Uint64("dbXLogPos",db.Status.XLogPos),zap.Uint64("masterXLogPos",masterDB.Status.XLogPos))
600+
continue
601+
}
602+
}
572603
bestNewMasters=append(bestNewMasters,db)
573604
}
574605
// Sort by XLogPos

‎cmd/sentinel/sentinel_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ func TestUpdateCluster(t *testing.T) {
183183
Proxy:&cluster.Proxy{},
184184
},
185185
},
186+
// #2 cluster initialization, more than one keeper, the first will be choosen to be the new master.
186187
{
187-
// #2 cluster initialization, more than one keeper, the first will be choosen to be the new master.
188188
cd:&cluster.ClusterData{
189189
Cluster:&cluster.Cluster{
190190
UID:"cluster1",

‎doc/cluster_spec.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Some options in a running cluster specification can be changed to update the des
1919
| failInterval| interval after the first fail to declare a keeper as not healthy.| no| string (duration)| 20s|
2020
| maxStandbys| max number of standbys. This needs to be greater enough to cover both standby managed by stolon and additional standbys configured by the user. Its value affect different postgres parameters like max_replication_slots and max_wal_senders. Setting this to a number lower than the sum of stolon managed standbys and user managed standbys will have unpredicatable effects due to problems creating replication slots or replication problems due to exhausted wal senders.| no| uint16| 20|
2121
| maxStandbysPerSender| max number of standbys for every sender. A sender can be a master or another standby (with cascading replication).| no| uint16| 3|
22+
| maxStandbyLag| maximum lag (from the last reported master state, in bytes) that an asynchronous standby can have to be elected in place of a failed master.| no| uint32| 1MiB|
2223
| synchronousReplication| use synchronous replication between the master and its standbys| no| bool| false|
2324
| minSynchronousStandbys| minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6)| no| int16| 1|
2425
| maxSynchronousStandbys| maximum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6)| no| int16| 1|

‎pkg/cluster/cluster.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ import (
3131
funcUint16P(uuint16)*uint16 {
3232
return&u
3333
}
34+
funcUint32P(uuint32)*uint32 {
35+
return&u
36+
}
3437

3538
funcBoolP(bbool)*bool {
3639
return&b
@@ -51,6 +54,7 @@ const (
5154
DefaultFailInterval=20*time.Second
5255
DefaultMaxStandbysuint16=20
5356
DefaultMaxStandbysPerSenderuint16=3
57+
DefaultMaxStandbyLag=1024*1204
5458
DefaultSynchronousReplication=false
5559
DefaultMaxSynchronousStandbysuint16=1
5660
DefaultMinSynchronousStandbysuint16=1
@@ -184,6 +188,9 @@ type ClusterSpec struct {
184188
// Max number of standbys for every sender. A sender can be a master or
185189
// another standby (if/when implementing cascading replication).
186190
MaxStandbysPerSender*uint16`json:"maxStandbysPerSender,omitempty"`
191+
// Max lag in bytes that an asynchronous standy can have to be elected in
192+
// place of a failed master
193+
MaxStandbyLag*uint32`json:"maxStandbyLage,omitempty"`
187194
// Use Synchronous replication between master and its standbys
188195
SynchronousReplication*bool`json:"synchronousReplication,omitempty"`
189196
// MinSynchronousStandbys is the mininum number if synchronous standbys
@@ -287,6 +294,9 @@ func (os *ClusterSpec) WithDefaults() *ClusterSpec {
287294
ifs.MaxStandbysPerSender==nil {
288295
s.MaxStandbysPerSender=Uint16P(DefaultMaxStandbysPerSender)
289296
}
297+
ifs.MaxStandbyLag==nil {
298+
s.MaxStandbyLag=Uint32P(DefaultMaxStandbyLag)
299+
}
290300
ifs.SynchronousReplication==nil {
291301
s.SynchronousReplication=BoolP(DefaultSynchronousReplication)
292302
}

‎tests/integration/ha_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinel
125125
SleepInterval:&cluster.Duration{Duration:2*time.Second},
126126
FailInterval:&cluster.Duration{Duration:5*time.Second},
127127
ConvergenceTimeout:&cluster.Duration{Duration:30*time.Second},
128+
MaxStandbyLag:cluster.Uint32P(50*1024),// limit lag to 50kiB
128129
SynchronousReplication:cluster.BoolP(syncRepl),
129130
UsePgrewind:cluster.BoolP(usePgrewind),
130131
PGParameters:make(cluster.PGParameters),
@@ -339,6 +340,9 @@ func testFailover(t *testing.T, syncRepl bool) {
339340
t.Fatalf("unexpected err: %v",err)
340341
}
341342

343+
// wait for the keepers to have reported their state (needed to know the instance XLogPos)
344+
time.Sleep(5*time.Second)
345+
342346
// Stop the keeper process on master, should also stop the database
343347
t.Logf("Stopping current master keeper: %s",master.uid)
344348
master.Stop()
@@ -399,6 +403,9 @@ func testFailoverFailed(t *testing.T, syncRepl bool) {
399403
t.Fatalf("unexpected err: %v",err)
400404
}
401405

406+
// wait for the keepers to have reported their state (needed to know the instance XLogPos)
407+
time.Sleep(5*time.Second)
408+
402409
// Stop the keeper process on master, should also stop the database
403410
t.Logf("Stopping current master keeper: %s",master.uid)
404411
master.Stop()
@@ -437,6 +444,58 @@ func TestFailoverFailedSyncRepl(t *testing.T) {
437444
testFailoverFailed(t,true)
438445
}
439446

447+
// test that a standby with a lag (reported) greater than MaxStandbyLag from the
448+
// master (reported) xlogpos won't be elected as the new master. This test is
449+
// valid only for asynchronous replication
450+
funcTestFailoverTooMuchLag(t*testing.T) {
451+
t.Parallel()
452+
453+
dir,err:=ioutil.TempDir("","stolon")
454+
iferr!=nil {
455+
t.Fatalf("unexpected err: %v",err)
456+
}
457+
deferos.RemoveAll(dir)
458+
459+
clusterName:=uuid.NewV4().String()
460+
461+
tks,tss,tstore:=setupServers(t,clusterName,dir,2,1,false,false)
462+
defershutdown(tks,tss,tstore)
463+
464+
storePath:=filepath.Join(common.StoreBasePath,clusterName)
465+
sm:=store.NewStoreManager(tstore.store,storePath)
466+
467+
master,standbys:=waitMasterStandbysReady(t,sm,tks)
468+
standby:=standbys[0]
469+
470+
iferr:=populate(t,master);err!=nil {
471+
t.Fatalf("unexpected err: %v",err)
472+
}
473+
474+
// stop the standby and write more than MaxStandbyLag data to the master
475+
t.Logf("Stopping current standby keeper: %s",standby.uid)
476+
standby.Stop()
477+
fori:=1;i<1000;i++ {
478+
iferr:=write(t,master,i,i);err!=nil {
479+
t.Fatalf("unexpected err: %v",err)
480+
}
481+
}
482+
483+
// wait for the master to have reported its state
484+
time.Sleep(5*time.Second)
485+
486+
// Stop the keeper process on master, should also stop the database
487+
t.Logf("Stopping current master keeper: %s",master.uid)
488+
master.Stop()
489+
// start the standby
490+
t.Logf("Starting current standby keeper: %s",standby.uid)
491+
standby.Start()
492+
493+
// standby shouldn't be elected as master since its lag is greater than MaxStandbyLag
494+
iferr:=standby.WaitRole(common.RoleMaster,30*time.Second);err==nil {
495+
t.Fatalf("standby shouldn't be elected as master")
496+
}
497+
}
498+
440499
functestOldMasterRestart(t*testing.T,syncRepl,usePgrewindbool) {
441500
dir,err:=ioutil.TempDir("","stolon")
442501
iferr!=nil {
@@ -468,6 +527,9 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool) {
468527
t.Fatalf("unexpected err: %v",err)
469528
}
470529

530+
// wait for the keepers to have reported their state (needed to know the instance XLogPos)
531+
time.Sleep(5*time.Second)
532+
471533
// Stop the keeper process on master, should also stop the database
472534
t.Logf("Stopping current master keeper: %s",master.uid)
473535
master.Stop()
@@ -573,6 +635,9 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool) {
573635
t.Fatalf("unexpected err: %v",err)
574636
}
575637

638+
// wait for the keepers to have reported their state (needed to know the instance XLogPos)
639+
time.Sleep(5*time.Second)
640+
576641
// Freeze the keeper and postgres processes on the master
577642
t.Logf("SIGSTOPping current master keeper: %s",master.uid)
578643
iferr:=master.Signal(syscall.SIGSTOP);err!=nil {
@@ -687,6 +752,9 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {
687752
t.Fatalf("unexpected err: %v",err)
688753
}
689754

755+
// wait for the keepers to have reported their state (needed to know the instance XLogPos)
756+
time.Sleep(5*time.Second)
757+
690758
// Wait replicated data to standby
691759
iferr:=waitLines(t,standbys[0],1,10*time.Second);err!=nil {
692760
t.Fatalf("unexpected err: %v",err)
@@ -827,6 +895,9 @@ func TestMasterChangedAddress(t *testing.T) {
827895
t.Fatalf("unexpected err: %v",err)
828896
}
829897

898+
// wait for the keepers to have reported their state (needed to know the instance XLogPos)
899+
time.Sleep(5*time.Second)
900+
830901
// Wait standby synced with master
831902
iferr:=waitLines(t,master,1,60*time.Second);err!=nil {
832903
t.Fatalf("unexpected err: %v",err)

‎tests/integration/utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -749,8 +749,8 @@ func WaitClusterDataKeeperInitialized(keeperUID string, e *store.StoreManager, t
749749
}
750750

751751
// WaitClusterDataSynchronousStandbys waits for:
752-
// *synchrnous standby defined in masterdb spec
753-
// *synchrnous standby reported from masterdb status
752+
// *synchronous standby defined in masterdb spec
753+
// *synchronous standby reported from masterdb status
754754
funcWaitClusterDataSynchronousStandbys(synchronousStandbys []string,e*store.StoreManager,timeout time.Duration)error {
755755
sort.Sort(sort.StringSlice(synchronousStandbys))
756756
start:=time.Now()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp