Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit71924e3

Browse files
knizhnikkelvich
authored andcommitted
Detect node disconnection in case of pglogical_receiver failures
1 parent20cbd9c commit71924e3

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

‎multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1289,7 +1289,7 @@ void MtmCheckQuorum(void)
12891289
}
12901290
}else {
12911291
if (Mtm->status==MTM_IN_MINORITY) {
1292-
MTM_LOG1("Node is in majority:dissbled mask %lx", (long)Mtm->disabledNodeMask);
1292+
MTM_LOG1("Node is in majority:disabled mask %lx", (long)Mtm->disabledNodeMask);
12931293
MtmSwitchClusterMode(MTM_ONLINE);
12941294
}
12951295
}

‎pglogical_receiver.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ static char const* const MtmReplicationModeName[] =
194194
{
195195
"recovered",/* SLOT_CREATE_NEW: recovery of node is completed so drop old slot and restart replication from the current position in WAL */
196196
"recovery",/* SLOT_OPEN_EXISTED: perform recorvery of the node by applying all data from theslot from specified point */
197-
"normal"/* SLOT_OPEN_ALWAYS: normal mode: useexisteed slot or create new one and start receiving data from it from the specified position */
197+
"normal"/* SLOT_OPEN_ALWAYS: normal mode: useexisted slot or create new one and start receiving data from it from the specified position */
198198
};
199199

200200
staticvoid
@@ -248,6 +248,7 @@ pglogical_receiver_main(Datum main_arg)
248248
PQfinish(conn);
249249
ereport(WARNING, (errmsg("%s: Could not establish connection to remote server",
250250
worker_proc)));
251+
/* Do not make decision about node status here because at startup peer node may just no yet started */
251252
/* MtmOnNodeDisconnect(nodeId); */
252253
proc_exit(1);
253254
}
@@ -271,6 +272,7 @@ pglogical_receiver_main(Datum main_arg)
271272
PQclear(res);
272273
ereport(ERROR, (errmsg("%s: Could not create logical slot",
273274
worker_proc)));
275+
MtmOnNodeDisconnect(nodeId);
274276
proc_exit(1);
275277
}
276278
}
@@ -312,6 +314,7 @@ pglogical_receiver_main(Datum main_arg)
312314
PQclear(res);
313315
ereport(WARNING, (errmsg("%s: Could not start logical replication",
314316
worker_proc)));
317+
MtmOnNodeDisconnect(nodeId);
315318
proc_exit(1);
316319
}
317320
PQclear(res);
@@ -402,6 +405,7 @@ pglogical_receiver_main(Datum main_arg)
402405
{
403406
ereport(LOG, (errmsg("%s: streaming header too small: %d",
404407
worker_proc,rc)));
408+
MtmOnNodeDisconnect(nodeId);
405409
proc_exit(1);
406410
}
407411
replyRequested=copybuf[pos];
@@ -421,15 +425,18 @@ pglogical_receiver_main(Datum main_arg)
421425
int64now=feGetCurrentTimestamp();
422426

423427
/* Leave is feedback is not sent properly */
424-
if (!sendFeedback(conn,now,nodeId))
428+
if (!sendFeedback(conn,now,nodeId)) {
429+
MtmOnNodeDisconnect(nodeId);
425430
proc_exit(1);
431+
}
426432
}
427433
continue;
428434
}
429435
elseif (copybuf[0]!='w')
430436
{
431437
ereport(LOG, (errmsg("%s: Incorrect streaming header",
432438
worker_proc)));
439+
MtmOnNodeDisconnect(nodeId);
433440
proc_exit(1);
434441
}
435442

@@ -538,6 +545,7 @@ pglogical_receiver_main(Datum main_arg)
538545
{
539546
ereport(LOG, (errmsg("%s: Incorrect status received... Leaving.",
540547
worker_proc)));
548+
MtmOnNodeDisconnect(nodeId);
541549
proc_exit(1);
542550
}
543551

@@ -546,6 +554,7 @@ pglogical_receiver_main(Datum main_arg)
546554
{
547555
ereport(LOG, (errmsg("%s: Data remaining on the socket... Leaving.",
548556
worker_proc)));
557+
MtmOnNodeDisconnect(nodeId);
549558
proc_exit(1);
550559
}
551560
continue;
@@ -564,6 +573,7 @@ pglogical_receiver_main(Datum main_arg)
564573
{
565574
ereport(LOG, (errmsg("%s: Failure while receiving changes...",
566575
worker_proc)));
576+
MtmOnNodeDisconnect(nodeId);
567577
proc_exit(1);
568578
}
569579
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp