Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc9717e6

Browse files
committed
Do not delete slots upon reconnect
1 parent9499f28 commitc9717e6

File tree

4 files changed

+45
-51
lines changed

4 files changed

+45
-51
lines changed

‎multimaster.c

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,11 @@ MtmAdjustOldestXid(TransactionId xid)
773773
Mtm->transListHead=prev;
774774
}
775775
}
776+
777+
if (!MyReplicationSlot) {
778+
MtmCheckSlots();
779+
}
780+
776781
returnxid;
777782
}
778783

@@ -1552,9 +1557,6 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15521557
MtmUnlock();
15531558

15541559
MtmResetTransaction();
1555-
if (!MyReplicationSlot) {
1556-
MtmCheckSlots();
1557-
}
15581560
if (MtmClusterLocked) {
15591561
MtmUnlockCluster();
15601562
}
@@ -2729,6 +2731,7 @@ static void MtmInitialize()
27292731
Mtm->nodes[i].originId=InvalidRepOriginId;
27302732
Mtm->nodes[i].timeline=0;
27312733
Mtm->nodes[i].nHeartbeats=0;
2734+
Mtm->nodes[i].slotDeleted= false;
27322735
}
27332736
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
27342737
/* All transaction originated from the current node should be ignored during recovery */
@@ -3805,8 +3808,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
38053808
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN<recoveredLSN) {
38063809
MTM_LOG1("Advance restartLSN for node %d from %llx to %llx (MtmReplicationStartupHook)",
38073810
MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
3808-
Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN==INVALID_LSN
3809-
||recoveredLSN<Mtm->nodes[MtmReplicationNodeId-1].restartLSN+MtmMaxRecoveryLag);
3811+
//Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == INVALID_LSN
3812+
// || recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
38103813
Mtm->nodes[MtmReplicationNodeId-1].restartLSN=recoveredLSN;
38113814
}
38123815
}else {

‎multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ typedef struct
235235
intlockGraphAllocated;
236236
intlockGraphUsed;
237237
uint64nHeartbeats;
238+
boolslotDeleted;/* Signalizes that node is already deleted our slot and
239+
* recovery from that node isn't possible.
240+
*/
238241
}MtmNodeInfo;
239242

240243
typedefstructMtmL2List

‎pglogical_receiver.c

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -301,23 +301,16 @@ pglogical_receiver_main(Datum main_arg)
301301
}
302302

303303
query=createPQExpBuffer();
304-
if ((mode==REPLMODE_OPEN_EXISTED&&timeline!=Mtm->nodes[nodeId-1].timeline)
305-
||mode==REPLMODE_CREATE_NEW)
306-
{/* recreate slot */
307-
timestamp_tstart=MtmGetSystemTime();
308-
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
309-
res=PQexec(conn,query->data);
310-
elog(LOG,"Drop replication slot %s: %ld milliseconds",slotName, (long)USEC_TO_MSEC(MtmGetSystemTime()-start));
311-
PQclear(res);
312-
resetPQExpBuffer(query);
313-
timeline=Mtm->nodes[nodeId-1].timeline;
314-
}
315-
/* My original assumption was that we can perfrom recovery only from existed slot,
316-
* but unfortunately looks like slots can "disapear" together with WAL-sender.
317-
* So let's try to recreate slot always. */
318-
/* if (mode != REPLMODE_REPLICATION) */
319-
{
320-
timestamp_tstart=MtmGetSystemTime();
304+
305+
/* Start logical replication at specified position */
306+
originStartPos=replorigin_get_progress(originId, false);
307+
if (originStartPos==INVALID_LSN) {
308+
/*
309+
* We are just creating new replication slot.
310+
* It is assumed that state of local and remote nodes is the same at this moment.
311+
* They are either empty, either new node is synchronized using base_backup.
312+
* So we assume that LSNs are the same for local and remote node
313+
*/
321314
appendPQExpBuffer(query,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",slotName,MULTIMASTER_NAME);
322315
res=PQexec(conn,query->data);
323316
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
@@ -331,30 +324,14 @@ pglogical_receiver_main(Datum main_arg)
331324
gotoOnError;
332325
}
333326
}
334-
elog(LOG,"Recreate replication slot %s: %ld milliseconds",slotName, (long)USEC_TO_MSEC(MtmGetSystemTime()-start));
335327
PQclear(res);
336328
resetPQExpBuffer(query);
337-
}
338-
339-
/* Start logical replication at specified position */
340-
if (originStartPos==INVALID_LSN) {
341-
originStartPos=replorigin_get_progress(originId, false);
342-
if (originStartPos==INVALID_LSN) {
343-
/*
344-
* We are just creating new replication slot.
345-
* It is assumed that state of local and remote nodes is the same at this moment.
346-
* Them are either empty, either new node is synchronized using base_backup.
347-
* So we assume that LSNs are the same for local and remote node
348-
*/
349-
originStartPos=INVALID_LSN;
350-
MTM_LOG1("Start logical receiver at position %llx from node %d",originStartPos,nodeId);
351-
}else {
352-
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
353-
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
354-
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
355-
}
356-
MTM_LOG1("Restart logical receiver at position %llx with origin=%d from node %d",originStartPos,originId,nodeId);
329+
}else {
330+
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
331+
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
332+
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
357333
}
334+
MTM_LOG1("Restart logical receiver at position %llx with origin=%d from node %d",originStartPos,originId,nodeId);
358335
}
359336

360337
MTM_LOG1("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx",
@@ -373,10 +350,21 @@ pglogical_receiver_main(Datum main_arg)
373350
res=PQexec(conn,query->data);
374351
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
375352
{
376-
PQclear(res);
377-
ereport(WARNING, (MTM_ERRMSG("%s: Could not start logical replication",
378-
worker_proc)));
379-
gotoOnError;
353+
inti,n_deleted_slots=0;
354+
355+
elog(WARNING,"Can't find slot on node%d. Shutting down receiver.",nodeId);
356+
Mtm->nodes[nodeId-1].slotDeleted= true;
357+
for (i=0;i<Mtm->nAllNodes;i++)
358+
{
359+
if (Mtm->nodes[i].slotDeleted)
360+
n_deleted_slots++;
361+
}
362+
if (n_deleted_slots==Mtm->nAllNodes-1)
363+
{
364+
elog(WARNING,"All neighbour nopes have no replication slot for us. Exiting.");
365+
kill(PostmasterPid,SIGTERM);
366+
}
367+
proc_exit(1);
380368
}
381369
PQclear(res);
382370
resetPQExpBuffer(query);

‎t/004_recovery.pl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
$cluster->pgbench(0, ('-i',-s=>'10') );
1515

1616
# kill node while neighbour is under load
17-
my$pgb_handle =$cluster->pgbench_async(0, ('-N',-T=>'10') );
17+
my$pgb_handle =$cluster->pgbench_async(1, ('-N',-T=>'10') );
1818
sleep(5);
1919
$cluster->{nodes}->[2]->stop('fast');
2020
$cluster->pgbench_await($pgb_handle);
2121

2222
# start node while neighbour is under load
23-
$pgb_handle =$cluster->pgbench_async(0, ('-N',-T=>'10') );
24-
sleep(5);
23+
$pgb_handle =$cluster->pgbench_async(0, ('-N',-T=>'50') );
24+
sleep(10);
2525
$cluster->{nodes}->[2]->start;
2626
$cluster->pgbench_await($pgb_handle);
2727

28-
# give it 10s to recover
28+
# give itextra10s to recover
2929
sleep(10);
3030

3131
# check data identity

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp