Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite337683

Browse files
committed
Fixes to stop/resume/recover mechanisms
1 parent9f2b1a3 commite337683

File tree

6 files changed

+77
-67
lines changed

6 files changed

+77
-67
lines changed

‎contrib/mmts/Cluster.pm

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -350,22 +350,25 @@ sub is_data_identic()
350350
subadd_node()
351351
{
352352
my ($self,%params) =@_;
353-
my$pgport =defined$params{port} ?$params{port} : (allocate_ports('127.0.0.1', 1))[0];
354-
my$arbiter_port =defined$params{arbiter_port} ?$params{arbiter_port} : (allocate_ports('127.0.0.1', 1))[0];
355353

354+
my$pgport;
355+
my$arbiter_port;
356356
my$connstrs;
357357
my$node_id;
358358

359359
if (defined$params{node_id})
360360
{
361361
$node_id =$params{node_id};
362+
$pgport =$params{port};
363+
$arbiter_port =$params{arbiter_port};
362364
$connstrs =$self->all_connstrs();
363365
}
364366
else
365367
{
366-
my$new_conn =", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
367-
$connstrs =$self->all_connstrs() .$new_conn;
368368
$node_id =scalar(@{$self->{nodes}}) + 1;
369+
$pgport = (allocate_ports('127.0.0.1', 1))[0];
370+
$arbiter_port = (allocate_ports('127.0.0.1', 1))[0];
371+
$connstrs =$self->all_connstrs() .", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
369372
}
370373

371374
my$node = PostgresNode->get_new_node("node${node_id}x");
@@ -377,6 +380,8 @@ sub add_node()
377380

378381
$node->{_host} ='127.0.0.1';
379382
$node->{_port} =$pgport;
383+
$node->{port} =$pgport;
384+
$node->{host} ='127.0.0.1';
380385
$node->{arbiter_port} =$arbiter_port;
381386
$node->{mmconnstr} ="${\$node->connstr('postgres') } arbiter_port=${\$node->{arbiter_port} }";
382387
$node->append_conf("postgresql.conf",qq(

‎contrib/mmts/multimaster.c

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ static void MtmShmemStartup(void);
170170

171171
staticBgwPool*MtmPoolConstructor(void);
172172
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql,char**errmsg);
173-
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
173+
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError,intforceOnNode);
174174
staticvoidMtmProcessDDLCommand(charconst*queryString,booltransactional);
175175

176176
staticvoidMtmLockCluster(void);
@@ -978,7 +978,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
978978
x->isTwoPhase= false;
979979
x->isTransactionBlock=IsTransactionBlock();
980980
/* Application name can be changed using PGAPPNAME environment variable */
981-
if (x->isDistributed&&Mtm->status!=MTM_ONLINE&&strcmp(application_name,MULTIMASTER_ADMIN)!=0&& !MtmBypass) {
981+
if (x->isDistributed&&Mtm->status!=MTM_ONLINE&&strcmp(application_name,MULTIMASTER_ADMIN)!=0
982+
&&strcmp(application_name,MULTIMASTER_BROADCAST_SERVICE)!=0
983+
&& !MtmBypass) {
982984
/* Reject all user's transactions at offline cluster.
983985
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
984986
*/
@@ -2410,7 +2412,7 @@ static void MtmInitialize()
24102412
for (i=0;i<MtmNodes;i++) {
24112413
Mtm->nodes[i].oldestSnapshot=0;
24122414
Mtm->nodes[i].disabledNodeMask=0;
2413-
Mtm->nodes[i].connectivityMask=7;// XXXX
2415+
Mtm->nodes[i].connectivityMask=(((nodemask_t)1 <<MtmNodes)-1);
24142416
Mtm->nodes[i].lockGraphUsed=0;
24152417
Mtm->nodes[i].lockGraphAllocated=0;
24162418
Mtm->nodes[i].lockGraphData=NULL;
@@ -2423,6 +2425,7 @@ static void MtmInitialize()
24232425
Mtm->nodes[i].originId=InvalidRepOriginId;
24242426
Mtm->nodes[i].timeline=0;
24252427
Mtm->nodes[i].nHeartbeats=0;
2428+
Mtm->nodes[i].manualRecovery= false;
24262429
Mtm->nodes[i].slotDeleted= false;
24272430
}
24282431
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
@@ -3345,9 +3348,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33453348
}
33463349

33473350
/* Await until node is connected and both receiver and sender are in clique */
3348-
while (BIT_CHECK(SELF_CONNECTIVITY_MASK,nodeId-1)||
3349-
!BIT_CHECK(Mtm->clique,nodeId-1)||
3350-
!BIT_CHECK(Mtm->clique,MtmNodeId-1) )
3351+
while (BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,nodeId-1)||
3352+
BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,MtmNodeId-1))
33513353
{
33523354
MtmUnlock();
33533355
if (*shutdown)
@@ -3402,6 +3404,7 @@ void MtmRecoverNode(int nodeId)
34023404
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
34033405
}
34043406
MtmLock(LW_EXCLUSIVE);
3407+
Mtm->nodes[nodeId-1].manualRecovery= true;
34053408
if (BIT_CHECK(Mtm->stoppedNodeMask,nodeId-1))
34063409
{
34073410
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
@@ -3412,8 +3415,8 @@ void MtmRecoverNode(int nodeId)
34123415

34133416
if (!MtmIsBroadcast())
34143417
{
3415-
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",nodeId), true);
3416-
MtmBroadcastUtilityStmt(psprintf("select mtm.recover_node(%d)",nodeId), true);
3418+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",nodeId), true,0);
3419+
MtmBroadcastUtilityStmt(psprintf("select mtm.recover_node(%d)",nodeId), true,0);
34173420
}
34183421
}
34193422

@@ -3443,7 +3446,7 @@ void MtmResumeNode(int nodeId)
34433446

34443447
if (!MtmIsBroadcast())
34453448
{
3446-
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)",nodeId), true);
3449+
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)",nodeId), true,nodeId);
34473450
}
34483451
}
34493452

@@ -3458,20 +3461,19 @@ void MtmStopNode(int nodeId, bool dropSlot)
34583461
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
34593462
}
34603463

3461-
MtmLock(LW_EXCLUSIVE);
3464+
if (!MtmIsBroadcast())
3465+
{
3466+
MtmBroadcastUtilityStmt(psprintf("select mtm.stop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true,nodeId);
3467+
}
34623468

3469+
MtmLock(LW_EXCLUSIVE);
34633470
BIT_SET(Mtm->stoppedNodeMask,nodeId-1);
3464-
34653471
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
34663472
{
34673473
MtmDisableNode(nodeId);
34683474
}
34693475
MtmUnlock();
34703476

3471-
if (!MtmIsBroadcast())
3472-
{
3473-
MtmBroadcastUtilityStmt(psprintf("select mtm.stop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true);
3474-
}
34753477
if (dropSlot)
34763478
{
34773479
MtmDropSlot(nodeId);
@@ -3545,12 +3547,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35453547
}
35463548

35473549
if (BIT_CHECK(Mtm->stoppedNodeMask,MtmReplicationNodeId-1)) {
3548-
MTM_ELOG(WARNING,"Stopped node %d tries to initiate recovery",MtmReplicationNodeId);
3549-
do {
3550-
MtmUnlock();
3551-
MtmSleep(STATUS_POLL_DELAY);
3552-
MtmLock(LW_EXCLUSIVE);
3553-
}while (BIT_CHECK(Mtm->stoppedNodeMask,MtmReplicationNodeId-1));
3550+
MtmUnlock();
3551+
MTM_ELOG(ERROR,"Stopped node %d tries to connect",MtmReplicationNodeId);
35543552
}
35553553

35563554
if (MtmIsRecoverySession) {
@@ -3857,8 +3855,8 @@ mtm_add_node(PG_FUNCTION_ARGS)
38573855
}
38583856
if (!MtmIsBroadcast())
38593857
{
3860-
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",Mtm->nAllNodes+1), true);
3861-
MtmBroadcastUtilityStmt(psprintf("select mtm.add_node('%s')",connStr), true);
3858+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",Mtm->nAllNodes+1), true,0);
3859+
MtmBroadcastUtilityStmt(psprintf("select mtm.add_node('%s')",connStr), true,0);
38623860
}
38633861
else
38643862
{
@@ -4403,7 +4401,7 @@ MtmNoticeReceiver(void *i, const PGresult *res)
44034401
pfree(stripped_notice);
44044402
}
44054403

4406-
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError)
4404+
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError,intforceOnNode)
44074405
{
44084406
inti=0;
44094407
nodemask_tdisabledNodeMask=Mtm->disabledNodeMask;
@@ -4415,7 +4413,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
44154413

44164414
for (i=0;i<nNodes;i++)
44174415
{
4418-
if (!BIT_CHECK(disabledNodeMask,i))
4416+
if (!BIT_CHECK(disabledNodeMask,i)|| (i+1==forceOnNode))
44194417
{
44204418
conns[i]=PQconnectdb_safe(psprintf("%s application_name=%s",Mtm->nodes[i].con.connStr,MULTIMASTER_BROADCAST_SERVICE));
44214419
if (PQstatus(conns[i])!=CONNECTION_OK)

‎contrib/mmts/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ typedef ulong64 lsn_t;
9595
typedefcharpgid_t[MULTIMASTER_MAX_GID_SIZE];
9696

9797
#defineSELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
98+
#defineEFFECTIVE_CONNECTIVITY_MASK ( SELF_CONNECTIVITY_MASK | Mtm->stoppedNodeMask | ~Mtm->clique )
9899

99100
typedefenum
100101
{
@@ -232,6 +233,7 @@ typedef struct
232233
intlockGraphAllocated;
233234
intlockGraphUsed;
234235
uint64nHeartbeats;
236+
boolmanualRecovery;
235237
boolslotDeleted;/* Signalizes that node is already deleted our slot and
236238
* recovery from that node isn't possible.
237239
*/

‎contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ pglogical_receiver_main(Datum main_arg)
309309

310310
/* Start logical replication at specified position */
311311
originStartPos=replorigin_get_progress(originId, false);
312-
if (originStartPos==INVALID_LSN) {
312+
if (originStartPos==INVALID_LSN||Mtm->nodes[nodeId-1].manualRecovery) {
313313
/*
314314
* We are just creating new replication slot.
315315
* It is assumed that state of local and remote nodes is the same at this moment.
@@ -331,6 +331,7 @@ pglogical_receiver_main(Datum main_arg)
331331
}
332332
PQclear(res);
333333
resetPQExpBuffer(query);
334+
Mtm->nodes[nodeId-1].manualRecovery= false;
334335
}else {
335336
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
336337
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);

‎contrib/mmts/state.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,23 @@ MtmCheckState(void)
8585
intnReceivers=Mtm->nAllNodes-countZeroBits(Mtm->pglogicalReceiverMask,Mtm->nAllNodes);
8686
intnSenders=Mtm->nAllNodes-countZeroBits(Mtm->pglogicalSenderMask,Mtm->nAllNodes);
8787

88-
MTM_LOG1("[STATE] Status = (disabled=%s, unaccessible=%s, clique=%s, receivers=%s, senders=%s, total=%i, major=%d)",
88+
MTM_LOG1("[STATE] Status = (disabled=%s, unaccessible=%s, clique=%s, receivers=%s, senders=%s, total=%i, major=%d, stopped=%s)",
8989
maskToString(Mtm->disabledNodeMask,Mtm->nAllNodes),
9090
maskToString(SELF_CONNECTIVITY_MASK,Mtm->nAllNodes),
9191
maskToString(Mtm->clique,Mtm->nAllNodes),
9292
maskToString(Mtm->pglogicalReceiverMask,Mtm->nAllNodes),
9393
maskToString(Mtm->pglogicalSenderMask,Mtm->nAllNodes),
9494
Mtm->nAllNodes,
95-
(MtmMajorNode||Mtm->refereeGrant) );
95+
(MtmMajorNode||Mtm->refereeGrant),
96+
maskToString(Mtm->stoppedNodeMask,Mtm->nAllNodes));
9697

9798
isEnabledState=
9899
( (nConnected >=Mtm->nAllNodes/2+1)/* majority */
99100
// XXXX: should we restrict major with two nodes setup?
100101
|| (nConnected==Mtm->nAllNodes/2&&MtmMajorNode)/* or half + major node */
101102
|| (nConnected==Mtm->nAllNodes/2&&Mtm->refereeGrant) )/* or half + referee */
102-
&&BIT_CHECK(Mtm->clique,MtmNodeId-1);/* in clique */
103+
&&BIT_CHECK(Mtm->clique,MtmNodeId-1)/* in clique */
104+
&& !BIT_CHECK(Mtm->stoppedNodeMask,MtmNodeId-1);/* is not stopped */
103105

104106
/* ANY -> MTM_DISABLED */
105107
if (!isEnabledState)
@@ -336,7 +338,7 @@ void MtmOnNodeConnect(int nodeId)
336338
// MtmRefreshClusterStatus();
337339
}
338340

339-
voidMtmReconnectNode(intnodeId)
341+
voidMtmReconnectNode(intnodeId)// XXXX evict that
340342
{
341343
// MTM_LOG1("[STATE] ReconnectNode for node %u", nodeId);
342344
MtmLock(LW_EXCLUSIVE);
@@ -391,13 +393,11 @@ MtmRefreshClusterStatus()
391393
* Check for referee decidion when pnly half of nodes are visible.
392394
*/
393395
if (MtmRefereeConnStr&&*MtmRefereeConnStr&& !Mtm->refereeGrant&&
394-
// XXXX visibility & ~clique?
395-
countZeroBits(SELF_CONNECTIVITY_MASK,Mtm->nAllNodes)==Mtm->nAllNodes/2)
396+
countZeroBits(EFFECTIVE_CONNECTIVITY_MASK,Mtm->nAllNodes)==Mtm->nAllNodes/2)
396397
{
397398
intwinner_node_id=MtmGetRefereeWinner();
398399
if (winner_node_id!=-1&&
399-
// XXXX visibility & ~clique?
400-
!BIT_CHECK(SELF_CONNECTIVITY_MASK,winner_node_id-1))
400+
!BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK,winner_node_id-1))
401401
{
402402
MTM_LOG1("[STATE] Referee allowed to proceed with half of the nodes (winner_id = %d)",
403403
winner_node_id);

‎contrib/mmts/t/005_add_stop_node.pl

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use PostgresNode;
44
use Cluster;
55
use TestLib;
6-
use Test::Moretests=>5;
6+
use Test::Moretests=>7;
77

88
my$cluster = new Cluster(3);
99
$cluster->init();
@@ -68,39 +68,43 @@
6868
$cluster->{nodes}->[2]->psql('postgres',"select 't'",
6969
stdout=> \$stopped_out,stderr=> \$stopped_err);
7070
is($cluster->is_data_identic( (0,1,3) ), 1,"soft stop / resume");
71+
print("::$stopped_out ::$stopped_err\n");
7172
is($stopped_outeq'' &&$stopped_errne'', 1,"soft stop / resume");
7273

7374
$cluster->psql(0,'postgres',"select mtm.resume_node(3)");
7475
$cluster->{nodes}->[2]->poll_query_until('postgres',"select 't'");
7576
$cluster->pgbench(2, ('-N','-n',-T=>'1') );
7677
is($cluster->is_data_identic( (0,1,2,3) ), 1,"soft stop / resume");
7778

78-
# ################################################################################
79-
# # hard stop / basebackup / recover
80-
# ################################################################################
79+
################################################################################
80+
# hard stop / basebackup / recover
81+
################################################################################
82+
83+
diag('Stopping node with slot drop');
84+
$cluster->psql(0,'postgres',"select mtm.stop_node(3,'t')");
85+
# await for comletion?
86+
$cluster->{nodes}->[2]->stop('fast');
87+
88+
$cluster->pgbench(0, ('-N','-n',-T=>'1') );
89+
$cluster->pgbench(1, ('-N','-n',-T=>'1') );
90+
$cluster->pgbench(3, ('-N','-n',-T=>'1') );
91+
is($cluster->is_data_identic( (0,1,3) ), 1,"hard stop / resume");
92+
93+
$cluster->psql(0,'postgres',"select mtm.recover_node(3)");
94+
95+
# now we need to perform backup from live node
96+
$cluster->add_node(port=>$cluster->{nodes}->[2]->{_port},
97+
arbiter_port=>$cluster->{nodes}->[2]->{arbiter_port},
98+
node_id=> 3);
99+
100+
my$dd =$cluster->{nodes}->[4]->data_dir;
101+
diag("preparing to start$dd");
81102

82-
# diag('Stopping node with slot drop');
83-
# $cluster->psql(0, 'postgres', "select mtm.stop_node(3,'t')");
84-
# # await for comletion?
85-
# $cluster->{nodes}->[2]->stop();
86-
87-
# $cluster->pgbench(0, ('-N', '-n', -T => '1') );
88-
# $cluster->pgbench(1, ('-N', '-n', -T => '1') );
89-
# $cluster->pgbench(3, ('-N', '-n', -T => '1') );
90-
# is($cluster->is_data_identic( (0,1,3) ), 1, "hard stop / resume");
91-
92-
# $cluster->psql(0, 'postgres', "select mtm.recover_node(3)");
93-
94-
# # now we need to perform backup from live node
95-
# $cluster->add_node(port => $cluster->{nodes}->[2]->{port},
96-
# arbiter_port => $cluster->{nodes}->[2]->{arbiter_port},
97-
# node_id => 3);
98-
# diag("preparing to start");
99-
# $cluster->{nodes}->[4]->start;
100-
# $cluster->{nodes}->[4]->poll_query_until('postgres', "select 't'");
101-
102-
# $cluster->pgbench(0, ('-N', '-n', -T => '1') );
103-
# $cluster->pgbench(1, ('-N', '-n', -T => '1') );
104-
# $cluster->pgbench(3, ('-N', '-n', -T => '1') );
105-
# $cluster->pgbench(4, ('-N', '-n', -T => '1') );
106-
# is($cluster->is_data_identic( (0,1,3,4) ), 1, "hard stop / resume");
103+
$cluster->{nodes}->[4]->start;
104+
$cluster->{nodes}->[4]->poll_query_until('postgres',"select 't'");
105+
106+
$cluster->pgbench(0, ('-N','-n',-T=>'1') );
107+
$cluster->pgbench(1, ('-N','-n',-T=>'1') );
108+
$cluster->pgbench(3, ('-N','-n',-T=>'1') );
109+
$cluster->pgbench(4, ('-N','-n',-T=>'1') );
110+
is($cluster->is_data_identic( (0,1,3,4) ), 1,"hard stop / resume");

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp