Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5135a59

Browse files
committed
Fixes to stop/resume/recover mechanisms
1 parentf3e7237 commit5135a59

File tree

6 files changed

+77
-67
lines changed

6 files changed

+77
-67
lines changed

‎Cluster.pm

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -350,22 +350,25 @@ sub is_data_identic()
350350
subadd_node()
351351
{
352352
my ($self,%params) =@_;
353-
my$pgport =defined$params{port} ?$params{port} : (allocate_ports('127.0.0.1', 1))[0];
354-
my$arbiter_port =defined$params{arbiter_port} ?$params{arbiter_port} : (allocate_ports('127.0.0.1', 1))[0];
355353

354+
my$pgport;
355+
my$arbiter_port;
356356
my$connstrs;
357357
my$node_id;
358358

359359
if (defined$params{node_id})
360360
{
361361
$node_id =$params{node_id};
362+
$pgport =$params{port};
363+
$arbiter_port =$params{arbiter_port};
362364
$connstrs =$self->all_connstrs();
363365
}
364366
else
365367
{
366-
my$new_conn =", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
367-
$connstrs =$self->all_connstrs() .$new_conn;
368368
$node_id =scalar(@{$self->{nodes}}) + 1;
369+
$pgport = (allocate_ports('127.0.0.1', 1))[0];
370+
$arbiter_port = (allocate_ports('127.0.0.1', 1))[0];
371+
$connstrs =$self->all_connstrs() .", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
369372
}
370373

371374
my$node = PostgresNode->get_new_node("node${node_id}x");
@@ -377,6 +380,8 @@ sub add_node()
377380

378381
$node->{_host} ='127.0.0.1';
379382
$node->{_port} =$pgport;
383+
$node->{port} =$pgport;
384+
$node->{host} ='127.0.0.1';
380385
$node->{arbiter_port} =$arbiter_port;
381386
$node->{mmconnstr} ="${\$node->connstr('postgres') } arbiter_port=${\$node->{arbiter_port} }";
382387
$node->append_conf("postgresql.conf",qq(

‎multimaster.c

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ static void MtmShmemStartup(void);
170170

171171
staticBgwPool*MtmPoolConstructor(void);
172172
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql,char**errmsg);
173-
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
173+
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError,intforceOnNode);
174174
staticvoidMtmProcessDDLCommand(charconst*queryString,booltransactional);
175175

176176
staticvoidMtmLockCluster(void);
@@ -978,7 +978,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
978978
x->isTwoPhase= false;
979979
x->isTransactionBlock=IsTransactionBlock();
980980
/* Application name can be changed using PGAPPNAME environment variable */
981-
if (x->isDistributed&&Mtm->status!=MTM_ONLINE&&strcmp(application_name,MULTIMASTER_ADMIN)!=0&& !MtmBypass) {
981+
if (x->isDistributed&&Mtm->status!=MTM_ONLINE&&strcmp(application_name,MULTIMASTER_ADMIN)!=0
982+
&&strcmp(application_name,MULTIMASTER_BROADCAST_SERVICE)!=0
983+
&& !MtmBypass) {
982984
/* Reject all user's transactions at offline cluster.
983985
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
984986
*/
@@ -2406,7 +2408,7 @@ static void MtmInitialize()
24062408
for (i=0;i<MtmNodes;i++) {
24072409
Mtm->nodes[i].oldestSnapshot=0;
24082410
Mtm->nodes[i].disabledNodeMask=0;
2409-
Mtm->nodes[i].connectivityMask=7;// XXXX
2411+
Mtm->nodes[i].connectivityMask=(((nodemask_t)1 <<MtmNodes)-1);
24102412
Mtm->nodes[i].lockGraphUsed=0;
24112413
Mtm->nodes[i].lockGraphAllocated=0;
24122414
Mtm->nodes[i].lockGraphData=NULL;
@@ -2419,6 +2421,7 @@ static void MtmInitialize()
24192421
Mtm->nodes[i].originId=InvalidRepOriginId;
24202422
Mtm->nodes[i].timeline=0;
24212423
Mtm->nodes[i].nHeartbeats=0;
2424+
Mtm->nodes[i].manualRecovery= false;
24222425
Mtm->nodes[i].slotDeleted= false;
24232426
}
24242427
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
@@ -3341,9 +3344,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33413344
}
33423345

33433346
/* Await until node is connected and both receiver and sender are in clique */
3344-
while (BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1)||
3345-
!BIT_CHECK(Mtm->clique,nodeId-1)||
3346-
!BIT_CHECK(Mtm->clique,MtmNodeId-1) )
3347+
while (BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,nodeId-1)||
3348+
BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,MtmNodeId-1))
33473349
{
33483350
MtmUnlock();
33493351
if (*shutdown)
@@ -3398,6 +3400,7 @@ void MtmRecoverNode(int nodeId)
33983400
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
33993401
}
34003402
MtmLock(LW_EXCLUSIVE);
3403+
Mtm->nodes[nodeId-1].manualRecovery= true;
34013404
if (BIT_CHECK(Mtm->stoppedNodeMask,nodeId-1))
34023405
{
34033406
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
@@ -3408,8 +3411,8 @@ void MtmRecoverNode(int nodeId)
34083411

34093412
if (!MtmIsBroadcast())
34103413
{
3411-
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",nodeId), true);
3412-
MtmBroadcastUtilityStmt(psprintf("select mtm.recover_node(%d)",nodeId), true);
3414+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",nodeId), true,0);
3415+
MtmBroadcastUtilityStmt(psprintf("select mtm.recover_node(%d)",nodeId), true,0);
34133416
}
34143417
}
34153418

@@ -3439,7 +3442,7 @@ void MtmResumeNode(int nodeId)
34393442

34403443
if (!MtmIsBroadcast())
34413444
{
3442-
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)",nodeId), true);
3445+
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)",nodeId), true,nodeId);
34433446
}
34443447
}
34453448

@@ -3454,20 +3457,19 @@ void MtmStopNode(int nodeId, bool dropSlot)
34543457
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
34553458
}
34563459

3457-
MtmLock(LW_EXCLUSIVE);
3460+
if (!MtmIsBroadcast())
3461+
{
3462+
MtmBroadcastUtilityStmt(psprintf("select mtm.stop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true,nodeId);
3463+
}
34583464

3465+
MtmLock(LW_EXCLUSIVE);
34593466
BIT_SET(Mtm->stoppedNodeMask,nodeId-1);
3460-
34613467
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
34623468
{
34633469
MtmDisableNode(nodeId);
34643470
}
34653471
MtmUnlock();
34663472

3467-
if (!MtmIsBroadcast())
3468-
{
3469-
MtmBroadcastUtilityStmt(psprintf("select mtm.stop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true);
3470-
}
34713473
if (dropSlot)
34723474
{
34733475
MtmDropSlot(nodeId);
@@ -3541,12 +3543,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35413543
}
35423544

35433545
if (BIT_CHECK(Mtm->stoppedNodeMask,MtmReplicationNodeId-1)) {
3544-
MTM_ELOG(WARNING,"Stopped node %d tries to initiate recovery",MtmReplicationNodeId);
3545-
do {
3546-
MtmUnlock();
3547-
MtmSleep(STATUS_POLL_DELAY);
3548-
MtmLock(LW_EXCLUSIVE);
3549-
}while (BIT_CHECK(Mtm->stoppedNodeMask,MtmReplicationNodeId-1));
3546+
MtmUnlock();
3547+
MTM_ELOG(ERROR,"Stopped node %d tries to connect",MtmReplicationNodeId);
35503548
}
35513549

35523550
if (MtmIsRecoverySession) {
@@ -3853,8 +3851,8 @@ mtm_add_node(PG_FUNCTION_ARGS)
38533851
}
38543852
if (!MtmIsBroadcast())
38553853
{
3856-
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",Mtm->nAllNodes+1), true);
3857-
MtmBroadcastUtilityStmt(psprintf("select mtm.add_node('%s')",connStr), true);
3854+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",Mtm->nAllNodes+1), true,0);
3855+
MtmBroadcastUtilityStmt(psprintf("select mtm.add_node('%s')",connStr), true,0);
38583856
}
38593857
else
38603858
{
@@ -4399,7 +4397,7 @@ MtmNoticeReceiver(void *i, const PGresult *res)
43994397
pfree(stripped_notice);
44004398
}
44014399

4402-
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError)
4400+
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError,intforceOnNode)
44034401
{
44044402
inti=0;
44054403
nodemask_tdisabledNodeMask=Mtm->disabledNodeMask;
@@ -4411,7 +4409,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
44114409

44124410
for (i=0;i<nNodes;i++)
44134411
{
4414-
if (!BIT_CHECK(disabledNodeMask,i))
4412+
if (!BIT_CHECK(disabledNodeMask,i)|| (i+1==forceOnNode))
44154413
{
44164414
conns[i]=PQconnectdb_safe(psprintf("%s application_name=%s",Mtm->nodes[i].con.connStr,MULTIMASTER_BROADCAST_SERVICE));
44174415
if (PQstatus(conns[i])!=CONNECTION_OK)

‎multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ typedef ulong64 lsn_t;
9595
typedefcharpgid_t[MULTIMASTER_MAX_GID_SIZE];
9696

9797
#defineSELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
98+
#defineEFFECTIVE_CONNECTIVITY_MASK ( SELF_CONNECTIVITY_MASK | Mtm->stoppedNodeMask | ~Mtm->clique )
9899

99100
typedefenum
100101
{
@@ -232,6 +233,7 @@ typedef struct
232233
intlockGraphAllocated;
233234
intlockGraphUsed;
234235
uint64nHeartbeats;
236+
boolmanualRecovery;
235237
boolslotDeleted;/* Signalizes that node is already deleted our slot and
236238
* recovery from that node isn't possible.
237239
*/

‎pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ pglogical_receiver_main(Datum main_arg)
309309

310310
/* Start logical replication at specified position */
311311
originStartPos=replorigin_get_progress(originId, false);
312-
if (originStartPos==INVALID_LSN) {
312+
if (originStartPos==INVALID_LSN||Mtm->nodes[nodeId-1].manualRecovery) {
313313
/*
314314
* We are just creating new replication slot.
315315
* It is assumed that state of local and remote nodes is the same at this moment.
@@ -331,6 +331,7 @@ pglogical_receiver_main(Datum main_arg)
331331
}
332332
PQclear(res);
333333
resetPQExpBuffer(query);
334+
Mtm->nodes[nodeId-1].manualRecovery= false;
334335
}else {
335336
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
336337
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);

‎state.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,23 @@ MtmCheckState(void)
8585
intnReceivers=Mtm->nAllNodes-countZeroBits(Mtm->pglogicalReceiverMask,Mtm->nAllNodes);
8686
intnSenders=Mtm->nAllNodes-countZeroBits(Mtm->pglogicalSenderMask,Mtm->nAllNodes);
8787

88-
MTM_LOG1("[STATE] Status = (disabled=%s, unaccessible=%s, clique=%s, receivers=%s, senders=%s, total=%i, major=%d)",
88+
MTM_LOG1("[STATE] Status = (disabled=%s, unaccessible=%s, clique=%s, receivers=%s, senders=%s, total=%i, major=%d, stopped=%s)",
8989
maskToString(Mtm->disabledNodeMask,Mtm->nAllNodes),
9090
maskToString(SELF_CONNECTIVITY_MASK,Mtm->nAllNodes),
9191
maskToString(Mtm->clique,Mtm->nAllNodes),
9292
maskToString(Mtm->pglogicalReceiverMask,Mtm->nAllNodes),
9393
maskToString(Mtm->pglogicalSenderMask,Mtm->nAllNodes),
9494
Mtm->nAllNodes,
95-
(MtmMajorNode||Mtm->refereeGrant) );
95+
(MtmMajorNode||Mtm->refereeGrant),
96+
maskToString(Mtm->stoppedNodeMask,Mtm->nAllNodes));
9697

9798
isEnabledState=
9899
( (nConnected >=Mtm->nAllNodes/2+1)/* majority */
99100
// XXXX: should we restrict major with two nodes setup?
100101
|| (nConnected==Mtm->nAllNodes/2&&MtmMajorNode)/* or half + major node */
101102
|| (nConnected==Mtm->nAllNodes/2&&Mtm->refereeGrant) )/* or half + referee */
102-
&&BIT_CHECK(Mtm->clique,MtmNodeId-1);/* in clique */
103+
&&BIT_CHECK(Mtm->clique,MtmNodeId-1)/* in clique */
104+
&& !BIT_CHECK(Mtm->stoppedNodeMask,MtmNodeId-1);/* is not stopped */
103105

104106
/* ANY -> MTM_DISABLED */
105107
if (!isEnabledState)
@@ -336,7 +338,7 @@ void MtmOnNodeConnect(int nodeId)
336338
// MtmRefreshClusterStatus();
337339
}
338340

339-
voidMtmReconnectNode(intnodeId)
341+
voidMtmReconnectNode(intnodeId)// XXXX evict that
340342
{
341343
// MTM_LOG1("[STATE] ReconnectNode for node %u", nodeId);
342344
MtmLock(LW_EXCLUSIVE);
@@ -391,13 +393,11 @@ MtmRefreshClusterStatus()
391393
* Check for referee decidion when pnly half of nodes are visible.
392394
*/
393395
if (MtmRefereeConnStr&&*MtmRefereeConnStr&& !Mtm->refereeGrant&&
394-
// XXXX visibility & ~clique?
395-
countZeroBits(SELF_CONNECTIVITY_MASK,Mtm->nAllNodes)==Mtm->nAllNodes/2)
396+
countZeroBits(EFFECTIVE_CONNECTIVITY_MASK,Mtm->nAllNodes)==Mtm->nAllNodes/2)
396397
{
397398
intwinner_node_id=MtmGetRefereeWinner();
398399
if (winner_node_id!=-1&&
399-
// XXXX visibility & ~clique?
400-
!BIT_CHECK(SELF_CONNECTIVITY_MASK,winner_node_id-1))
400+
!BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,winner_node_id-1))
401401
{
402402
MTM_LOG1("[STATE] Referee allowed to proceed with half of the nodes (winner_id = %d)",
403403
winner_node_id);

‎t/005_add_stop_node.pl

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use PostgresNode;
44
use Cluster;
55
use TestLib;
6-
use Test::Moretests=>5;
6+
use Test::Moretests=>7;
77

88
my$cluster = new Cluster(3);
99
$cluster->init();
@@ -68,39 +68,43 @@
6868
$cluster->{nodes}->[2]->psql('postgres',"select 't'",
6969
stdout=> \$stopped_out,stderr=> \$stopped_err);
7070
is($cluster->is_data_identic( (0,1,3) ), 1,"soft stop / resume");
71+
print("::$stopped_out ::$stopped_err\n");
7172
is($stopped_outeq'' &&$stopped_errne'', 1,"soft stop / resume");
7273

7374
$cluster->psql(0,'postgres',"select mtm.resume_node(3)");
7475
$cluster->{nodes}->[2]->poll_query_until('postgres',"select 't'");
7576
$cluster->pgbench(2, ('-N','-n',-T=>'1') );
7677
is($cluster->is_data_identic( (0,1,2,3) ), 1,"soft stop / resume");
7778

78-
# ################################################################################
79-
# # hard stop / basebackup / recover
80-
# ################################################################################
79+
################################################################################
80+
# hard stop / basebackup / recover
81+
################################################################################
82+
83+
diag('Stopping node with slot drop');
84+
$cluster->psql(0,'postgres',"select mtm.stop_node(3,'t')");
85+
# await for comletion?
86+
$cluster->{nodes}->[2]->stop('fast');
87+
88+
$cluster->pgbench(0, ('-N','-n',-T=>'1') );
89+
$cluster->pgbench(1, ('-N','-n',-T=>'1') );
90+
$cluster->pgbench(3, ('-N','-n',-T=>'1') );
91+
is($cluster->is_data_identic( (0,1,3) ), 1,"hard stop / resume");
92+
93+
$cluster->psql(0,'postgres',"select mtm.recover_node(3)");
94+
95+
# now we need to perform backup from live node
96+
$cluster->add_node(port=>$cluster->{nodes}->[2]->{_port},
97+
arbiter_port=>$cluster->{nodes}->[2]->{arbiter_port},
98+
node_id=> 3);
99+
100+
my$dd =$cluster->{nodes}->[4]->data_dir;
101+
diag("preparing to start$dd");
81102

82-
# diag('Stopping node with slot drop');
83-
# $cluster->psql(0, 'postgres', "select mtm.stop_node(3,'t')");
84-
# # await for comletion?
85-
# $cluster->{nodes}->[2]->stop();
86-
87-
# $cluster->pgbench(0, ('-N', '-n', -T => '1') );
88-
# $cluster->pgbench(1, ('-N', '-n', -T => '1') );
89-
# $cluster->pgbench(3, ('-N', '-n', -T => '1') );
90-
# is($cluster->is_data_identic( (0,1,3) ), 1, "hard stop / resume");
91-
92-
# $cluster->psql(0, 'postgres', "select mtm.recover_node(3)");
93-
94-
# # now we need to perform backup from live node
95-
# $cluster->add_node(port => $cluster->{nodes}->[2]->{port},
96-
# arbiter_port => $cluster->{nodes}->[2]->{arbiter_port},
97-
# node_id => 3);
98-
# diag("preparing to start");
99-
# $cluster->{nodes}->[4]->start;
100-
# $cluster->{nodes}->[4]->poll_query_until('postgres', "select 't'");
101-
102-
# $cluster->pgbench(0, ('-N', '-n', -T => '1') );
103-
# $cluster->pgbench(1, ('-N', '-n', -T => '1') );
104-
# $cluster->pgbench(3, ('-N', '-n', -T => '1') );
105-
# $cluster->pgbench(4, ('-N', '-n', -T => '1') );
106-
# is($cluster->is_data_identic( (0,1,3,4) ), 1, "hard stop / resume");
103+
$cluster->{nodes}->[4]->start;
104+
$cluster->{nodes}->[4]->poll_query_until('postgres',"select 't'");
105+
106+
$cluster->pgbench(0, ('-N','-n',-T=>'1') );
107+
$cluster->pgbench(1, ('-N','-n',-T=>'1') );
108+
$cluster->pgbench(3, ('-N','-n',-T=>'1') );
109+
$cluster->pgbench(4, ('-N','-n',-T=>'1') );
110+
is($cluster->is_data_identic( (0,1,3,4) ), 1,"hard stop / resume");

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp