Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit21fee91

Browse files
knizhnikkelvich
authored andcommitted
Drop replication slots on recovered nodes
1 parent4993bce commit21fee91

File tree

6 files changed

+33
-21
lines changed

6 files changed

+33
-21
lines changed

‎arbiter.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ static void MtmSendHeartbeat()
362362
MTM_LOG2("Send heartbeat to node %d with timestamp %ld",i+1,now);
363363
}
364364
}else {
365-
MTM_LOG1("Do not sendhearbeat to node %d, busy mask %ld, status %d",i+1,busy_mask,Mtm->status);
365+
MTM_LOG1("Do not sendheartbeat to node %d, busy mask %ld, status %d",i+1,busy_mask,Mtm->status);
366366
}
367367
}
368368
}
@@ -484,6 +484,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
484484
MtmCheckResponse(&resp);
485485
MtmUnlock();
486486

487+
MtmOnNodeConnect(node+1);
488+
487489
busy_mask=save_mask;
488490

489491
returnsd;
@@ -539,7 +541,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
539541
}
540542
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
541543
if (sockets[node] >=0) {
542-
elog(WARNING,"Arbiterfailed to write to node %d: %d",node+1,errno);
544+
elog(WARNING,"Arbiterfail to write to node %d: %d",node+1,errno);
543545
close(sockets[node]);
544546
sockets[node]=-1;
545547
}
@@ -549,7 +551,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
549551
result= false;
550552
break;
551553
}
552-
MTM_LOG3("Arbiterrestablished connection with node %d",node+1);
554+
MTM_LOG3("Arbiterreestablish connection with node %d",node+1);
553555
}else {
554556
result= true;
555557
break;
@@ -1075,7 +1077,7 @@ static void MtmReceiver(Datum arg)
10751077
if (!MtmWatchdog(now)) {
10761078
for (i=0;i<nNodes;i++) {
10771079
if (Mtm->nodes[i].lastHeartbeat!=0&&sockets[i] >=0) {
1078-
MTM_LOG1("Lasthearbeat from node %d received %ld microseconds ago",i+1,now-Mtm->nodes[i].lastHeartbeat);
1080+
MTM_LOG1("Lastheartbeat from node %d received %ld microseconds ago",i+1,now-Mtm->nodes[i].lastHeartbeat);
10791081
}
10801082
}
10811083
}

‎multimaster.c

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
450450
for (i=0;i<MAX_WAIT_LOOPS;i++)
451451
{
452452
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
453-
if (ts!=NULL/*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
453+
if (ts!=NULL/*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
454454
{
455455
if (ts->csn>MtmTx.snapshot) {
456456
MTM_LOG4("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld",
@@ -1242,13 +1242,17 @@ void MtmAbortTransaction(MtmTransState* ts)
12421242
{
12431243
Assert(MtmLockCount!=0);/* should be invoked with exclsuive lock */
12441244
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
1245-
MTM_LOG1("Rollback active transaction %d:%d (local xid %d)",ts->gtid.node,ts->gtid.xid,ts->xid);
1246-
ts->status=TRANSACTION_STATUS_ABORTED;
1247-
MtmAdjustSubtransactions(ts);
1248-
if (ts->isActive) {
1249-
ts->isActive= false;
1250-
Assert(Mtm->nActiveTransactions!=0);
1251-
Mtm->nActiveTransactions-=1;
1245+
if (ts->status==TRANSACTION_STATUS_COMMITTED) {
1246+
elog(WARNING,"Attempt to rollback already committed transaction %d (%s)",ts->xid,ts->gid);
1247+
}else {
1248+
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %d",ts->gtid.node,ts->gtid.xid,ts->xid,ts->status);
1249+
ts->status=TRANSACTION_STATUS_ABORTED;
1250+
MtmAdjustSubtransactions(ts);
1251+
if (ts->isActive) {
1252+
ts->isActive= false;
1253+
Assert(Mtm->nActiveTransactions!=0);
1254+
Mtm->nActiveTransactions-=1;
1255+
}
12521256
}
12531257
}
12541258
}
@@ -1320,6 +1324,7 @@ static void MtmDisableNode(int nodeId)
13201324
elog(WARNING,"Disable node %d at xlog position %lx, last status change time %d msec ago",nodeId,GetXLogInsertRecPtr(),
13211325
(int)USEC_TO_MSEC(now-Mtm->nodes[nodeId-1].lastStatusChangeTime));
13221326
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
1327+
Mtm->nodes[nodeId-1].timeline+=1;
13231328
Mtm->nodes[nodeId-1].lastStatusChangeTime=now;
13241329
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
13251330
if (nodeId!=MtmNodeId) {
@@ -1343,8 +1348,8 @@ static void MtmEnableNode(int nodeId)
13431348
voidMtmRecoveryCompleted(void)
13441349
{
13451350
inti;
1346-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx,reconnect mask=%lx, live nodes=%d",
1347-
MtmNodeId,Mtm->disabledNodeMask,Mtm->reconnectMask,Mtm->nLiveNodes);
1351+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx,connectivity mask=%lx, live nodes=%d",
1352+
MtmNodeId,Mtm->disabledNodeMask,Mtm->connectivityMask,Mtm->nLiveNodes);
13481353
MtmLock(LW_EXCLUSIVE);
13491354
Mtm->recoverySlot=0;
13501355
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
@@ -1902,6 +1907,7 @@ static void MtmInitialize()
19021907
Mtm->nodes[i].lastHeartbeat=0;
19031908
Mtm->nodes[i].restartLsn=0;
19041909
Mtm->nodes[i].originId=InvalidRepOriginId;
1910+
Mtm->nodes[i].timeline=0;
19051911
}
19061912
PGSemaphoreCreate(&Mtm->sendSemaphore);
19071913
PGSemaphoreReset(&Mtm->sendSemaphore);

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ typedef struct
184184
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
185185
XLogRecPtrrestartLsn;
186186
RepOriginIdoriginId;
187+
inttimeline;
187188
}MtmNodeInfo;
188189

189190
typedefstructMtmTransState

‎pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ process_remote_commit(StringInfo in)
620620
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
621621
csn=pq_getmsgint64(in);
622622
gid=pq_getmsgstring(in);
623-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld",csn,gid,end_lsn);
623+
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld",csn,gid,end_lsn);
624624
StartTransactionCommand();
625625
MtmBeginSession();
626626
MtmSetCurrentTransactionCSN(csn);
@@ -1057,7 +1057,7 @@ void MtmExecutor(int id, void* work, size_t size)
10571057
MemoryContextSwitchTo(oldcontext);
10581058
EmitErrorReport();
10591059
FlushErrorState();
1060-
MTM_LOG2("%d: REMOTE begin abort transaction %d",MyProcPid,MtmGetCurrentTransactionId());
1060+
MTM_LOG1("%d: REMOTE begin abort transaction %d",MyProcPid,MtmGetCurrentTransactionId());
10611061
MtmEndSession(false);
10621062
AbortCurrentTransaction();
10631063
MTM_LOG2("%d: REMOTE end abort transaction %d",MyProcPid,MtmGetCurrentTransactionId());

‎pglogical_proto.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
155155
else
156156
Assert(false);
157157

158+
Assert(flags!=PGLOGICAL_COMMIT_PREPARED||txn->xid<1000||MtmTransactionRecords!=1);
159+
158160
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
159161
if (MtmIsFilteredTxn) {
160162
Assert(MtmTransactionRecords==0);
@@ -164,8 +166,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
164166
csn_tcsn=MtmTransactionSnapshot(txn->xid);
165167
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
166168

167-
168-
if (!isRecovery&&txn->origin_id!=InvalidRepOriginId)
169+
if (!isRecovery&&csn==INVALID_CSN&& (flags!=PGLOGICAL_ABORT_PREPARED||txn->origin_id!=InvalidRepOriginId))
169170
{
170171
if (flags==PGLOGICAL_ABORT_PREPARED) {
171172
MTM_LOG1("Skip ABORT_PREPARED for transaction %s to node %d",txn->gid,MtmReplicationNodeId);
@@ -182,6 +183,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
182183
flags |=PGLOGICAL_CAUGHT_UP;
183184
}
184185
}
186+
185187
pq_sendbyte(out,'C');/* sending COMMIT */
186188

187189
MTM_LOG2("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",flags,txn->gid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());

‎pglogical_receiver.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ pglogical_receiver_main(Datum main_arg)
250250
intcount;
251251
ConnStatusTypestatus;
252252
XLogRecPtroriginStartPos=InvalidXLogRecPtr;
253+
inttimeline;
253254

254255
/*
255256
* Determine when and how we should open replication slot.
@@ -261,6 +262,7 @@ pglogical_receiver_main(Datum main_arg)
261262
{
262263
break;
263264
}
265+
timeline=Mtm->nodes[nodeId-1].timeline;
264266
count=Mtm->recoveryCount;
265267

266268
/* Establish connection to remote server */
@@ -274,14 +276,13 @@ pglogical_receiver_main(Datum main_arg)
274276
}
275277

276278
query=createPQExpBuffer();
277-
#if0/* Do we need to recreate slot ? */
278-
if (mode==REPLMODE_RECOVERED) {/* recreate slot */
279+
if (mode==REPLMODE_NORMAL&&timeline!=Mtm->nodes[nodeId-1].timeline) {/* recreate slot */
279280
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
280281
res=PQexec(conn,query->data);
281282
PQclear(res);
282283
resetPQExpBuffer(query);
284+
timeline=Mtm->nodes[nodeId-1].timeline;
283285
}
284-
#endif
285286
/* My original assumption was that we can perfrom recovery only fromm existed slot,
286287
* but unfortunately looks like slots can "disapear" together with WAL-sender.
287288
* So let's try to recreate slot always. */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp