Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd67b021

Browse files
committed
Add mtm.resume_node function
1 parent8514306 commitd67b021

File tree

3 files changed

+61
-6
lines changed

3 files changed

+61
-6
lines changed

‎contrib/mmts/multimaster--1.0.sql

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@ CREATE FUNCTION mtm.stop_replication() RETURNS void
1818
AS'MODULE_PATHNAME','mtm_stop_replication'
1919
LANGUAGE C;
2020

21+
-- Stop replication to the node. Node is didsabled, If drop_slot is true, then replication slot is dropped and node can be recovered using basebackup and recover_node function.
22+
-- If drop_slot is false and limit for maximal slot gap was not reached, then node can be restarted using resume_node function.
2123
CREATEFUNCTIONmtm.stop_node(nodeinteger, drop_slot bool default false) RETURNS void
2224
AS'MODULE_PATHNAME','mtm_stop_node'
2325
LANGUAGE C;
2426

27+
-- Add new node to the cluster. Number of nodes should not exeed maximal number of nodes in the cluster.
2528
CREATEFUNCTIONmtm.add_node(conn_strtext) RETURNS void
2629
AS'MODULE_PATHNAME','mtm_add_node'
2730
LANGUAGE C;
2831

29-
-- Create replication slot for the node which was previouslystopped
32+
-- Create replication slot for the node which was previouslystalled (its replicatoin slot was deleted)
3033
CREATEFUNCTIONmtm.recover_node(nodeinteger) RETURNS void
3134
AS'MODULE_PATHNAME','mtm_recover_node'
3235
LANGUAGE C;
3336

37+
-- Resume previously stopped node with live replication slot. If node was not stopped, this function has no effect.
38+
-- It doesn't create slot and returns error if node is stalled (slot eas dropped)
39+
CREATEFUNCTIONmtm.resume_node(nodeinteger) RETURNS void
40+
AS'MODULE_PATHNAME','mtm_resume_node'
41+
LANGUAGE C;
42+
3443

3544
CREATEFUNCTIONmtm.get_snapshot() RETURNSbigint
3645
AS'MODULE_PATHNAME','mtm_get_snapshot'
@@ -63,11 +72,11 @@ CREATE FUNCTION mtm.get_trans_by_xid(xid bigint) RETURNS mtm.trans_state
6372
AS'MODULE_PATHNAME','mtm_get_trans_by_xid'
6473
LANGUAGE C;
6574

66-
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
75+
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
6776
AS'MODULE_PATHNAME','mtm_get_cluster_state'
6877
LANGUAGE C;
6978

70-
CREATEFUNCTIONmtm.collect_cluster_info() RETURNS SETOFmtm.cluster_state
79+
CREATEFUNCTIONmtm.collect_cluster_info() RETURNS SETOFmtm.cluster_state
7180
AS'MODULE_PATHNAME','mtm_collect_cluster_info'
7281
LANGUAGE C;
7382

@@ -105,7 +114,7 @@ LANGUAGE C;
105114

106115
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schematext, rel_nametext,primary key(rel_schema, rel_name));
107116

108-
CREATE OR REPLACEFUNCTIONmtm.alter_sequences() RETURNSbooleanAS
117+
CREATE OR REPLACEFUNCTIONmtm.alter_sequences() RETURNSbooleanAS
109118
$$
110119
DECLARE
111120
seq_class record;

‎contrib/mmts/multimaster.c

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ PG_FUNCTION_INFO_V1(mtm_stop_node);
117117
PG_FUNCTION_INFO_V1(mtm_add_node);
118118
PG_FUNCTION_INFO_V1(mtm_poll_node);
119119
PG_FUNCTION_INFO_V1(mtm_recover_node);
120+
PG_FUNCTION_INFO_V1(mtm_resume_node);
120121
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
121122
PG_FUNCTION_INFO_V1(mtm_get_csn);
122123
PG_FUNCTION_INFO_V1(mtm_get_trans_by_gid);
@@ -1997,7 +1998,7 @@ static void MtmDisableNode(int nodeId)
19971998
*/
19981999
staticvoidMtmEnableNode(intnodeId)
19992000
{
2000-
if (BIT_SET(Mtm->disabledNodeMask,nodeId-1)) {
2001+
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
20012002
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
20022003
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
20032004
BIT_SET(Mtm->recoveredNodeMask,nodeId-1);
@@ -3656,7 +3657,7 @@ void MtmRecoverNode(int nodeId)
36563657
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
36573658
}
36583659
MtmLock(LW_EXCLUSIVE);
3659-
if (BIT_SET(Mtm->stoppedNodeMask,nodeId-1))
3660+
if (BIT_CHECK(Mtm->stoppedNodeMask,nodeId-1))
36603661
{
36613662
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
36623663
BIT_CLEAR(Mtm->stoppedNodeMask,nodeId-1);
@@ -3671,6 +3672,36 @@ void MtmRecoverNode(int nodeId)
36713672
}
36723673
}
36733674

3675+
/*
3676+
* Resume previosly stopped node.
3677+
* This function creates logical replication slot for the node which will collect
3678+
* all changes which should be sent to this node from this moment.
3679+
*/
3680+
voidMtmResumeNode(intnodeId)
3681+
{
3682+
if (nodeId <=0||nodeId>Mtm->nAllNodes)
3683+
{
3684+
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
3685+
}
3686+
MtmLock(LW_EXCLUSIVE);
3687+
if (BIT_CHECK(Mtm->stalledNodeMask,nodeId-1))
3688+
{
3689+
MtmUnlock();
3690+
MTM_ELOG(ERROR,"Node %d can not be resumed because it's replication slot is dropped",nodeId);
3691+
}
3692+
if (BIT_CHECK(Mtm->stoppedNodeMask,nodeId-1))
3693+
{
3694+
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
3695+
BIT_CLEAR(Mtm->stoppedNodeMask,nodeId-1);
3696+
}
3697+
MtmUnlock();
3698+
3699+
if (!MtmIsBroadcast())
3700+
{
3701+
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)",nodeId), true);
3702+
}
3703+
}
3704+
36743705
/*
36753706
* Permanently exclude node from the cluster. Node will not participate in voting and can not be automatically recovered
36763707
* until MtmRecoverNode is invoked.
@@ -3763,6 +3794,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
37633794
}
37643795
MTM_LOG1("Startup of logical replication to node %d",MtmReplicationNodeId);
37653796
MtmLock(LW_EXCLUSIVE);
3797+
3798+
if (BIT_CHECK(Mtm->stalledNodeMask,MtmReplicationNodeId-1)) {
3799+
MtmUnlock();
3800+
MTM_ELOG(ERROR,"Stalled node %d tries to initiate recovery",MtmReplicationNodeId);
3801+
}
3802+
37663803
if (BIT_CHECK(Mtm->stoppedNodeMask,MtmReplicationNodeId-1)) {
37673804
MTM_ELOG(WARNING,"Stopped node %d tries to initiate recovery",MtmReplicationNodeId);
37683805
do {
@@ -4149,6 +4186,14 @@ mtm_recover_node(PG_FUNCTION_ARGS)
41494186
PG_RETURN_VOID();
41504187
}
41514188

4189+
Datum
4190+
mtm_resume_node(PG_FUNCTION_ARGS)
4191+
{
4192+
intnodeId=PG_GETARG_INT32(0);
4193+
MtmResumeNode(nodeId);
4194+
PG_RETURN_VOID();
4195+
}
4196+
41524197
Datum
41534198
mtm_get_snapshot(PG_FUNCTION_ARGS)
41544199
{

‎contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ extern void MtmUnlockNode(int nodeId);
398398
externvoidMtmStopNode(intnodeId,booldropSlot);
399399
externvoidMtmReconnectNode(intnodeId);
400400
externvoidMtmRecoverNode(intnodeId);
401+
externvoidMtmResumeNode(intnodeId);
401402
externvoidMtmOnNodeDisconnect(intnodeId);
402403
externvoidMtmOnNodeConnect(intnodeId);
403404
externvoidMtmWakeUpBackend(MtmTransState*ts);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp