Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7d24637

Browse files
committed
Do not delete slots upon reconnect
1 parentd4ef72d commit7d24637

File tree

4 files changed

+45
-51
lines changed

4 files changed

+45
-51
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,11 @@ MtmAdjustOldestXid(TransactionId xid)
772772
Mtm->transListHead=prev;
773773
}
774774
}
775+
776+
if (!MyReplicationSlot) {
777+
MtmCheckSlots();
778+
}
779+
775780
returnxid;
776781
}
777782

@@ -1551,9 +1556,6 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15511556
MtmUnlock();
15521557

15531558
MtmResetTransaction();
1554-
if (!MyReplicationSlot) {
1555-
MtmCheckSlots();
1556-
}
15571559
if (MtmClusterLocked) {
15581560
MtmUnlockCluster();
15591561
}
@@ -2728,6 +2730,7 @@ static void MtmInitialize()
27282730
Mtm->nodes[i].originId=InvalidRepOriginId;
27292731
Mtm->nodes[i].timeline=0;
27302732
Mtm->nodes[i].nHeartbeats=0;
2733+
Mtm->nodes[i].slotDeleted= false;
27312734
}
27322735
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
27332736
/* All transaction originated from the current node should be ignored during recovery */
@@ -3791,8 +3794,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
37913794
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN<recoveredLSN) {
37923795
MTM_LOG1("Advance restartLSN for node %d from %llx to %llx (MtmReplicationStartupHook)",
37933796
MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
3794-
Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN==INVALID_LSN
3795-
||recoveredLSN<Mtm->nodes[MtmReplicationNodeId-1].restartLSN+MtmMaxRecoveryLag);
3797+
//Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == INVALID_LSN
3798+
// || recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
37963799
Mtm->nodes[MtmReplicationNodeId-1].restartLSN=recoveredLSN;
37973800
}
37983801
}else {

‎contrib/mmts/multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ typedef struct
235235
intlockGraphAllocated;
236236
intlockGraphUsed;
237237
uint64nHeartbeats;
238+
boolslotDeleted;/* Signalizes that node is already deleted our slot and
239+
* recovery from that node isn't possible.
240+
*/
238241
}MtmNodeInfo;
239242

240243
typedefstructMtmL2List

‎contrib/mmts/pglogical_receiver.c

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -299,23 +299,16 @@ pglogical_receiver_main(Datum main_arg)
299299
}
300300

301301
query=createPQExpBuffer();
302-
if ((mode==REPLMODE_OPEN_EXISTED&&timeline!=Mtm->nodes[nodeId-1].timeline)
303-
||mode==REPLMODE_CREATE_NEW)
304-
{/* recreate slot */
305-
timestamp_tstart=MtmGetSystemTime();
306-
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
307-
res=PQexec(conn,query->data);
308-
elog(LOG,"Drop replication slot %s: %ld milliseconds",slotName, (long)USEC_TO_MSEC(MtmGetSystemTime()-start));
309-
PQclear(res);
310-
resetPQExpBuffer(query);
311-
timeline=Mtm->nodes[nodeId-1].timeline;
312-
}
313-
/* My original assumption was that we can perfrom recovery only from existed slot,
314-
* but unfortunately looks like slots can "disapear" together with WAL-sender.
315-
* So let's try to recreate slot always. */
316-
/* if (mode != REPLMODE_REPLICATION) */
317-
{
318-
timestamp_tstart=MtmGetSystemTime();
302+
303+
/* Start logical replication at specified position */
304+
originStartPos=replorigin_get_progress(originId, false);
305+
if (originStartPos==INVALID_LSN) {
306+
/*
307+
* We are just creating new replication slot.
308+
* It is assumed that state of local and remote nodes is the same at this moment.
309+
* They are either empty, either new node is synchronized using base_backup.
310+
* So we assume that LSNs are the same for local and remote node
311+
*/
319312
appendPQExpBuffer(query,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",slotName,MULTIMASTER_NAME);
320313
res=PQexec(conn,query->data);
321314
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
@@ -329,30 +322,14 @@ pglogical_receiver_main(Datum main_arg)
329322
gotoOnError;
330323
}
331324
}
332-
elog(LOG,"Recreate replication slot %s: %ld milliseconds",slotName, (long)USEC_TO_MSEC(MtmGetSystemTime()-start));
333325
PQclear(res);
334326
resetPQExpBuffer(query);
335-
}
336-
337-
/* Start logical replication at specified position */
338-
if (originStartPos==INVALID_LSN) {
339-
originStartPos=replorigin_get_progress(originId, false);
340-
if (originStartPos==INVALID_LSN) {
341-
/*
342-
* We are just creating new replication slot.
343-
* It is assumed that state of local and remote nodes is the same at this moment.
344-
* Them are either empty, either new node is synchronized using base_backup.
345-
* So we assume that LSNs are the same for local and remote node
346-
*/
347-
originStartPos=INVALID_LSN;
348-
MTM_LOG1("Start logical receiver at position %llx from node %d",originStartPos,nodeId);
349-
}else {
350-
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
351-
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
352-
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
353-
}
354-
MTM_LOG1("Restart logical receiver at position %llx with origin=%d from node %d",originStartPos,originId,nodeId);
327+
}else {
328+
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
329+
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
330+
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
355331
}
332+
MTM_LOG1("Restart logical receiver at position %llx with origin=%d from node %d",originStartPos,originId,nodeId);
356333
}
357334

358335
MTM_LOG1("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx",
@@ -371,10 +348,21 @@ pglogical_receiver_main(Datum main_arg)
371348
res=PQexec(conn,query->data);
372349
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
373350
{
374-
PQclear(res);
375-
ereport(WARNING, (MTM_ERRMSG("%s: Could not start logical replication",
376-
worker_proc)));
377-
gotoOnError;
351+
inti,n_deleted_slots=0;
352+
353+
elog(WARNING,"Can't find slot on node%d. Shutting down receiver.",nodeId);
354+
Mtm->nodes[nodeId-1].slotDeleted= true;
355+
for (i=0;i<Mtm->nAllNodes;i++)
356+
{
357+
if (Mtm->nodes[i].slotDeleted)
358+
n_deleted_slots++;
359+
}
360+
if (n_deleted_slots==Mtm->nAllNodes-1)
361+
{
362+
elog(WARNING,"All neighbour nopes have no replication slot for us. Exiting.");
363+
kill(PostmasterPid,SIGTERM);
364+
}
365+
proc_exit(1);
378366
}
379367
PQclear(res);
380368
resetPQExpBuffer(query);

‎contrib/mmts/t/004_recovery.pl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
$cluster->pgbench(0, ('-i',-s=>'10') );
1515

1616
# kill node while neighbour is under load
17-
my$pgb_handle =$cluster->pgbench_async(0, ('-N',-T=>'10') );
17+
my$pgb_handle =$cluster->pgbench_async(1, ('-N',-T=>'10') );
1818
sleep(5);
1919
$cluster->{nodes}->[2]->stop('fast');
2020
$cluster->pgbench_await($pgb_handle);
2121

2222
# start node while neighbour is under load
23-
$pgb_handle =$cluster->pgbench_async(0, ('-N',-T=>'10') );
24-
sleep(5);
23+
$pgb_handle =$cluster->pgbench_async(0, ('-N',-T=>'50') );
24+
sleep(10);
2525
$cluster->{nodes}->[2]->start;
2626
$cluster->pgbench_await($pgb_handle);
2727

28-
# give it 10s to recover
28+
# give itextra10s to recover
2929
sleep(10);
3030

3131
# check data identity

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp