Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf051d35

Browse files
knizhnikkelvich
authored andcommitted
Add recover_node function
1 parente0105f6 commitf051d35

File tree

8 files changed

+43
-9
lines changed

8 files changed

+43
-9
lines changed

‎arbiter.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ static BackgroundWorker MtmSender = {
119119
"mtm-sender",
120120
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,/* do not need connection to the database */
121121
BgWorkerStart_ConsistentState,
122-
1,/* restart in one second (is it possible to restart immediately?) */
122+
MULTIMASTER_BGW_RESTART_TIMEOUT,
123123
MtmTransSender
124124
};
125125

126126
staticBackgroundWorkerMtmRecevier= {
127127
"mtm-receiver",
128128
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,/* do not need connection to the database */
129129
BgWorkerStart_ConsistentState,
130-
1,/* restart in one second (is it possible to restart immediately?) */
130+
MULTIMASTER_BGW_RESTART_TIMEOUT,
131131
MtmTransReceiver
132132
};
133133

@@ -297,6 +297,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
297297

298298
/* Some node considered that I am dead, so switch to recovery mode */
299299
if (BIT_CHECK(msg.disabledNodeMask,MtmNodeId-1)) {
300+
elog(WARNING,"Node %d think that I am dead",msg.node);
300301
MtmSwitchClusterMode(MTM_RECOVERY);
301302
}
302303
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */

‎bgwpool.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
8585
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION;
8686
worker.bgw_start_time=BgWorkerStart_ConsistentState;
8787
worker.bgw_main=BgwPoolMainLoop;
88-
worker.bgw_restart_time=10;/* Wait 10 seconds for restart before crash */
88+
worker.bgw_restart_time=MULTIMASTER_BGW_RESTART_TIMEOUT;
8989

9090
for (i=0;i<nWorkers;i++) {
9191
BgwPoolExecutorCtx*ctx= (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));

‎bgwpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
typedefvoid(*BgwPoolExecutor)(intid,void*work,size_tsize);
99

1010
#defineMAX_DBNAME_LEN 30
11+
#defineMULTIMASTER_BGW_RESTART_TIMEOUT 10/* seconds */
1112

1213
typedefstruct
1314
{

‎multimaster--1.0.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16+
-- Create replication slot for the node which was previously dropped together with it's slot
17+
CREATEFUNCTIONmtm.recover_node(nodeinteger) RETURNS void
18+
AS'MODULE_PATHNAME','mtm_recover_node'
19+
LANGUAGE C;
20+
21+
1622
CREATEFUNCTIONmtm.get_snapshot() RETURNSbigint
1723
AS'MODULE_PATHNAME','mtm_get_snapshot'
1824
LANGUAGE C;

‎multimaster.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ PG_MODULE_MAGIC;
8989
PG_FUNCTION_INFO_V1(mtm_start_replication);
9090
PG_FUNCTION_INFO_V1(mtm_stop_replication);
9191
PG_FUNCTION_INFO_V1(mtm_drop_node);
92+
PG_FUNCTION_INFO_V1(mtm_recover_node);
9293
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
9394

9495
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
@@ -1182,6 +1183,22 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
11821183
returndtm->recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
11831184
}
11841185

1186+
voidMtmRecoverNode(intnodeId)
1187+
{
1188+
if (nodeId <=0||nodeId>dtm->nNodes)
1189+
{
1190+
elog(ERROR,"NodeID %d is out of range [1,%d]",nodeId,dtm->nNodes);
1191+
}
1192+
if (!BIT_CHECK(dtm->disabledNodeMask,nodeId-1)) {
1193+
elog(ERROR,"Node %d was not disabled",nodeId);
1194+
}
1195+
if (!IsTransactionBlock())
1196+
{
1197+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",nodeId), true);
1198+
}
1199+
}
1200+
1201+
11851202
voidMtmDropNode(intnodeId,booldropSlot)
11861203
{
11871204
if (!BIT_CHECK(dtm->disabledNodeMask,nodeId-1))
@@ -1227,6 +1244,14 @@ mtm_drop_node(PG_FUNCTION_ARGS)
12271244
PG_RETURN_VOID();
12281245
}
12291246

1247+
Datum
1248+
mtm_recover_node(PG_FUNCTION_ARGS)
1249+
{
1250+
intnodeId=PG_GETARG_INT32(0);
1251+
MtmRecoverNode(nodeId);
1252+
PG_RETURN_VOID();
1253+
}
1254+
12301255
Datum
12311256
mtm_get_snapshot(PG_FUNCTION_ARGS)
12321257
{
@@ -1599,7 +1624,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
15991624
{
16001625
if ((proclock->holdMask&LOCKBIT_ON(lm))&& (conflictMask&LOCKBIT_ON(lm)))
16011626
{
1602-
MTM_TRACE("%d: %u(%u) waits for %u(%u)\n",getpid(),srcPgXact->xid,proc->pid,dstPgXact->xid,proclock->tag.myProc->pid);
1627+
MTM_TRACE("%d: %u(%u) waits for %u(%u)\n",MyProcPid,srcPgXact->xid,proc->pid,dstPgXact->xid,proclock->tag.myProc->pid);
16031628
MtmGetGtid(srcPgXact->xid,&gtid);/* transaction holding lock */
16041629
ByteBufferAppendInt32(buf,gtid.node);
16051630
ByteBufferAppendInt32(buf,gtid.xid);
@@ -1689,6 +1714,7 @@ void MtmRefreshClusterStatus(bool nowait)
16891714

16901715
clique=MtmFindMaxClique(matrix,MtmNodes,&clique_size);
16911716
if (clique_size >=MtmNodes/2+1) {/* have quorum */
1717+
elog(WARNING,"Find clique %lx, disabledNodeMask %lx",clique,dtm->disabledNodeMask);
16921718
MtmLock(LW_EXCLUSIVE);
16931719
mask= ~clique& (((nodemask_t)1 <<MtmNodes)-1)& ~dtm->disabledNodeMask;/* new disabled nodes mask */
16941720
for (i=0;mask!=0;i++,mask >>=1) {

‎multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#define MTM_TUPLE_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1414
*/
1515

16-
#defineMULTIMASTER_NAME "mtm"
16+
#defineMULTIMASTER_NAME "multimaster"
1717
#defineMULTIMASTER_SCHEMA_NAME "mtm"
1818
#defineMULTIMASTER_DDL_TABLE "ddl_log"
1919
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
@@ -146,6 +146,7 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
146146
externvoidMtmLock(LWLockModemode);
147147
externvoidMtmUnlock(void);
148148
externvoidMtmDropNode(intnodeId,booldropSlot);
149+
externvoidMtmRecoverNode(intnodeId);
149150
externvoidMtmOnNodeDisconnect(intnodeId);
150151
externvoidMtmOnNodeConnect(intnodeId);
151152
externMtmState*MtmGetState(void);

‎pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -830,7 +830,7 @@ void MtmExecutor(int id, void* work, size_t size)
830830
PG_CATCH();
831831
{
832832
FlushErrorState();
833-
MTM_TRACE("%d: REMOTE abort transaction %d\n",getpid(),GetCurrentTransactionId());
833+
MTM_TRACE("%d: REMOTE abort transaction %d\n",MyProcPid,GetCurrentTransactionId());
834834
AbortCurrentTransaction();
835835
}
836836
PG_END_TRY();

‎pglogical_receiver.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ static int receiver_idle_time = 0;
5656
staticboolreceiver_sync_mode= false;
5757

5858
/* Worker name */
59-
staticchar*worker_name="multimaster";
6059
charworker_proc[BGW_MAXLEN];
6160

6261
/* Lastly written positions */
@@ -252,7 +251,7 @@ pglogical_receiver_main(Datum main_arg)
252251
resetPQExpBuffer(query);
253252
}
254253
if (mode!=SLOT_OPEN_EXISTED) {
255-
appendPQExpBuffer(query,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",args->receiver_slot,worker_name);
254+
appendPQExpBuffer(query,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",args->receiver_slot,MULTIMASTER_NAME);
256255
res=PQexec(conn,query->data);
257256
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
258257
{
@@ -568,7 +567,7 @@ int MtmStartReceivers(char* conns, int node_id)
568567
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION;
569568
worker.bgw_start_time=BgWorkerStart_ConsistentState;
570569
worker.bgw_main=pglogical_receiver_main;
571-
worker.bgw_restart_time=10;/* Wait 10 seconds for restart before crash */
570+
worker.bgw_restart_time=MULTIMASTER_BGW_RESTART_TIMEOUT;
572571

573572
while (conn_str<conn_str_end) {
574573
char*p=strchr(conn_str,',');

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp