Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4444c22

Browse files
knizhnikkelvich
authored andcommitted
Add mtm.resume_node function
1 parentbcc04ea commit4444c22

File tree

3 files changed

+61
-6
lines changed

3 files changed

+61
-6
lines changed

‎multimaster--1.0.sql

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@ CREATE FUNCTION mtm.stop_replication() RETURNS void
1818
AS'MODULE_PATHNAME','mtm_stop_replication'
1919
LANGUAGE C;
2020

21+
-- Stop replication to the node. Node is didsabled, If drop_slot is true, then replication slot is dropped and node can be recovered using basebackup and recover_node function.
22+
-- If drop_slot is false and limit for maximal slot gap was not reached, then node can be restarted using resume_node function.
2123
CREATEFUNCTIONmtm.stop_node(nodeinteger, drop_slot bool default false) RETURNS void
2224
AS'MODULE_PATHNAME','mtm_stop_node'
2325
LANGUAGE C;
2426

27+
-- Add new node to the cluster. Number of nodes should not exeed maximal number of nodes in the cluster.
2528
CREATEFUNCTIONmtm.add_node(conn_strtext) RETURNS void
2629
AS'MODULE_PATHNAME','mtm_add_node'
2730
LANGUAGE C;
2831

29-
-- Create replication slot for the node which was previouslystopped
32+
-- Create replication slot for the node which was previouslystalled (its replicatoin slot was deleted)
3033
CREATEFUNCTIONmtm.recover_node(nodeinteger) RETURNS void
3134
AS'MODULE_PATHNAME','mtm_recover_node'
3235
LANGUAGE C;
3336

37+
-- Resume previously stopped node with live replication slot. If node was not stopped, this function has no effect.
38+
-- It doesn't create slot and returns error if node is stalled (slot eas dropped)
39+
CREATEFUNCTIONmtm.resume_node(nodeinteger) RETURNS void
40+
AS'MODULE_PATHNAME','mtm_resume_node'
41+
LANGUAGE C;
42+
3443

3544
CREATEFUNCTIONmtm.get_snapshot() RETURNSbigint
3645
AS'MODULE_PATHNAME','mtm_get_snapshot'
@@ -63,11 +72,11 @@ CREATE FUNCTION mtm.get_trans_by_xid(xid bigint) RETURNS mtm.trans_state
6372
AS'MODULE_PATHNAME','mtm_get_trans_by_xid'
6473
LANGUAGE C;
6574

66-
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
75+
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
6776
AS'MODULE_PATHNAME','mtm_get_cluster_state'
6877
LANGUAGE C;
6978

70-
CREATEFUNCTIONmtm.collect_cluster_info() RETURNS SETOFmtm.cluster_state
79+
CREATEFUNCTIONmtm.collect_cluster_info() RETURNS SETOFmtm.cluster_state
7180
AS'MODULE_PATHNAME','mtm_collect_cluster_info'
7281
LANGUAGE C;
7382

@@ -105,7 +114,7 @@ LANGUAGE C;
105114

106115
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schema name, rel_name name,primary key(rel_schema, rel_name));
107116

108-
CREATE OR REPLACEFUNCTIONmtm.alter_sequences() RETURNSbooleanAS
117+
CREATE OR REPLACEFUNCTIONmtm.alter_sequences() RETURNSbooleanAS
109118
$$
110119
DECLARE
111120
seq_class record;

‎multimaster.c

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ PG_FUNCTION_INFO_V1(mtm_stop_node);
117117
PG_FUNCTION_INFO_V1(mtm_add_node);
118118
PG_FUNCTION_INFO_V1(mtm_poll_node);
119119
PG_FUNCTION_INFO_V1(mtm_recover_node);
120+
PG_FUNCTION_INFO_V1(mtm_resume_node);
120121
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
121122
PG_FUNCTION_INFO_V1(mtm_get_csn);
122123
PG_FUNCTION_INFO_V1(mtm_get_trans_by_gid);
@@ -2002,7 +2003,7 @@ static void MtmDisableNode(int nodeId)
20022003
*/
20032004
staticvoidMtmEnableNode(intnodeId)
20042005
{
2005-
if (BIT_SET(Mtm->disabledNodeMask,nodeId-1)) {
2006+
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
20062007
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
20072008
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
20082009
BIT_SET(Mtm->recoveredNodeMask,nodeId-1);
@@ -3678,7 +3679,7 @@ void MtmRecoverNode(int nodeId)
36783679
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
36793680
}
36803681
MtmLock(LW_EXCLUSIVE);
3681-
if (BIT_SET(Mtm->stoppedNodeMask,nodeId-1))
3682+
if (BIT_CHECK(Mtm->stoppedNodeMask,nodeId-1))
36823683
{
36833684
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
36843685
BIT_CLEAR(Mtm->stoppedNodeMask,nodeId-1);
@@ -3693,6 +3694,36 @@ void MtmRecoverNode(int nodeId)
36933694
}
36943695
}
36953696

3697+
/*
3698+
* Resume previosly stopped node.
3699+
* This function creates logical replication slot for the node which will collect
3700+
* all changes which should be sent to this node from this moment.
3701+
*/
3702+
voidMtmResumeNode(intnodeId)
3703+
{
3704+
if (nodeId <=0||nodeId>Mtm->nAllNodes)
3705+
{
3706+
MTM_ELOG(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nAllNodes);
3707+
}
3708+
MtmLock(LW_EXCLUSIVE);
3709+
if (BIT_CHECK(Mtm->stalledNodeMask,nodeId-1))
3710+
{
3711+
MtmUnlock();
3712+
MTM_ELOG(ERROR,"Node %d can not be resumed because it's replication slot is dropped",nodeId);
3713+
}
3714+
if (BIT_CHECK(Mtm->stoppedNodeMask,nodeId-1))
3715+
{
3716+
Assert(BIT_CHECK(Mtm->disabledNodeMask,nodeId-1));
3717+
BIT_CLEAR(Mtm->stoppedNodeMask,nodeId-1);
3718+
}
3719+
MtmUnlock();
3720+
3721+
if (!MtmIsBroadcast())
3722+
{
3723+
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)",nodeId), true);
3724+
}
3725+
}
3726+
36963727
/*
36973728
* Permanently exclude node from the cluster. Node will not participate in voting and can not be automatically recovered
36983729
* until MtmRecoverNode is invoked.
@@ -3785,6 +3816,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
37853816
}
37863817
MTM_LOG1("Startup of logical replication to node %d",MtmReplicationNodeId);
37873818
MtmLock(LW_EXCLUSIVE);
3819+
3820+
if (BIT_CHECK(Mtm->stalledNodeMask,MtmReplicationNodeId-1)) {
3821+
MtmUnlock();
3822+
MTM_ELOG(ERROR,"Stalled node %d tries to initiate recovery",MtmReplicationNodeId);
3823+
}
3824+
37883825
if (BIT_CHECK(Mtm->stoppedNodeMask,MtmReplicationNodeId-1)) {
37893826
MTM_ELOG(WARNING,"Stopped node %d tries to initiate recovery",MtmReplicationNodeId);
37903827
do {
@@ -4171,6 +4208,14 @@ mtm_recover_node(PG_FUNCTION_ARGS)
41714208
PG_RETURN_VOID();
41724209
}
41734210

4211+
Datum
4212+
mtm_resume_node(PG_FUNCTION_ARGS)
4213+
{
4214+
intnodeId=PG_GETARG_INT32(0);
4215+
MtmResumeNode(nodeId);
4216+
PG_RETURN_VOID();
4217+
}
4218+
41744219
Datum
41754220
mtm_get_snapshot(PG_FUNCTION_ARGS)
41764221
{

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ extern void MtmUnlockNode(int nodeId);
400400
externvoidMtmStopNode(intnodeId,booldropSlot);
401401
externvoidMtmReconnectNode(intnodeId);
402402
externvoidMtmRecoverNode(intnodeId);
403+
externvoidMtmResumeNode(intnodeId);
403404
externvoidMtmOnNodeDisconnect(intnodeId);
404405
externvoidMtmOnNodeConnect(intnodeId);
405406
externvoidMtmWakeUpBackend(MtmTransState*ts);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp