Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc39b549

Browse files
knizhnikkelvich
authored andcommitted
Change detection of duplicated transaction
1 parent94f2024 commitc39b549

File tree

4 files changed

+48
-29
lines changed

4 files changed

+48
-29
lines changed

‎multimaster.c

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,8 +1531,8 @@ static void MtmEnableNode(int nodeId)
15311531
voidMtmRecoveryCompleted(void)
15321532
{
15331533
inti;
1534-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, live nodes=%d",
1535-
MtmNodeId, (long long)Mtm->disabledNodeMask, (long long)Mtm->connectivityMask,Mtm->nLiveNodes);
1534+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx,endLSN=%lx,live nodes=%d",
1535+
MtmNodeId, (long long)Mtm->disabledNodeMask, (long long)Mtm->connectivityMask,GetXLogInsertRecPtr(),Mtm->nLiveNodes);
15361536
MtmLock(LW_EXCLUSIVE);
15371537
Mtm->recoverySlot=0;
15381538
Mtm->recoveredLSN=GetXLogInsertRecPtr();
@@ -1542,7 +1542,7 @@ void MtmRecoveryCompleted(void)
15421542
for (i=0;i<Mtm->nAllNodes;i++) {
15431543
Mtm->nodes[i].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
15441544
}
1545-
/* Mode will be changed to online once all logicalreciever are connected */
1545+
/* Mode will be changed to online once all logicalreceiver are connected */
15461546
MtmSwitchClusterMode(MTM_CONNECTED);
15471547
MtmUnlock();
15481548
}
@@ -2131,7 +2131,6 @@ static void MtmInitialize()
21312131
Mtm->nodes[i].restartLSN=InvalidXLogRecPtr;
21322132
Mtm->nodes[i].originId=InvalidRepOriginId;
21332133
Mtm->nodes[i].timeline=0;
2134-
Mtm->nodes[i].recoveredLSN=InvalidXLogRecPtr;
21352134
}
21362135
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
21372136
/* All transaction originated from the current node should be ignored during recovery */
@@ -2884,13 +2883,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28842883
{
28852884
MtmReplicationModemode=REPLMODE_OPEN_EXISTED;
28862885

2886+
MtmLock(LW_EXCLUSIVE);
28872887
while ((Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)||BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
28882888
{
28892889
if (*shutdown)
28902890
{
2891+
MtmUnlock();
28912892
returnREPLMODE_EXIT;
28922893
}
2893-
MtmLock(LW_EXCLUSIVE);
28942894
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
28952895
mode=REPLMODE_CREATE_NEW;
28962896
}
@@ -2913,6 +2913,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29132913
MtmUnlock();
29142914
/* delay opening of other slots until recovery is completed */
29152915
MtmSleep(STATUS_POLL_DELAY);
2916+
MtmLock(LW_EXCLUSIVE);
29162917
}
29172918
if (mode==REPLMODE_RECOVERED) {
29182919
MTM_LOG1("%d: Restart replication from node %d after end of recovery",MyProcPid,nodeId);
@@ -2921,6 +2922,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29212922
}else {
29222923
MTM_LOG1("%d: Continue replication from node %d",MyProcPid,nodeId);
29232924
}
2925+
MtmUnlock();
29242926
returnmode;
29252927
}
29262928

@@ -3014,7 +3016,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30143016
}
30153017
}elseif (strcmp("mtm_recovered_pos",elem->defname)==0) {
30163018
if (elem->arg!=NULL&&strVal(elem->arg)!=NULL) {
3017-
sscanf(strVal(elem->arg),"%lx",&Mtm->nodes[MtmReplicationNodeId-1].recoveredLSN);
3019+
XLogRecPtrrecoveredLSN;
3020+
sscanf(strVal(elem->arg),"%lx",&recoveredLSN);
3021+
MTM_LOG1("Recovered position of node %d is %lx",MtmReplicationNodeId,recoveredLSN);
3022+
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN<recoveredLSN) {
3023+
Mtm->nodes[MtmReplicationNodeId-1].restartLSN=recoveredLSN;
3024+
}
30183025
}else {
30193026
elog(ERROR,"Recovered position is not specified");
30203027
}
@@ -3129,16 +3136,21 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
31293136
returnisDistributed;
31303137
}
31313138

3139+
/*
3140+
* Filter received transacyions at destination side.
3141+
* This function is executed by receiver, so there are no race conditions and it is possible to update nodes[i].restaetLSN without lock
3142+
*/
31323143
boolMtmFilterTransaction(char*record,intsize)
31333144
{
31343145
StringInfoDatas;
31353146
uint8flags;
31363147
XLogRecPtrorigin_lsn;
31373148
XLogRecPtrend_lsn;
3149+
XLogRecPtrrestart_lsn;
31383150
intreplication_node;
31393151
intorigin_node;
31403152
charconst*gid="";
3141-
boolduplicate;
3153+
boolduplicate= false;
31423154

31433155
s.data=record;
31443156
s.len=size;
@@ -3174,11 +3186,17 @@ bool MtmFilterTransaction(char* record, int size)
31743186
default:
31753187
break;
31763188
}
3189+
restart_lsn=origin_node==MtmReplicationNodeId ?end_lsn :origin_lsn;
3190+
if (Mtm->nodes[origin_node-1].restartLSN<restart_lsn) {
3191+
Mtm->nodes[origin_node-1].restartLSN=restart_lsn;
3192+
}else {
3193+
duplicate= true;
3194+
}
3195+
31773196
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3178-
duplicate=origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLSN;
31793197

31803198
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3181-
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,flags,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
3199+
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
31823200
returnduplicate;
31833201
}
31843202

‎multimaster.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ typedef struct
204204
void*lockGraphData;
205205
intlockGraphAllocated;
206206
intlockGraphUsed;
207-
XLogRecPtrrecoveredLSN;
208207
}MtmNodeInfo;
209208

210209
typedefstructMtmTransState

‎pglogical_apply.c

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,11 +430,17 @@ process_remote_message(StringInfo s)
430430
MtmAbortLogicalMessage*msg= (MtmAbortLogicalMessage*)messageBody;
431431
intorigin_node=msg->origin_node;
432432
Assert(messageSize==sizeof(MtmAbortLogicalMessage));
433+
/* This function is called directly by receiver, so there is no race condition and we can update
434+
* restartLSN without locks
435+
*/
433436
if (Mtm->nodes[origin_node-1].restartLSN<msg->origin_lsn) {
434437
Mtm->nodes[origin_node-1].restartLSN=msg->origin_lsn;
438+
replorigin_session_origin_lsn=msg->origin_lsn;
439+
MtmRollbackPreparedTransaction(origin_node,msg->gid);
440+
}else {
441+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %lx <= %lx",
442+
msg->gid,origin_node,msg->origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
435443
}
436-
replorigin_session_origin_lsn=msg->origin_lsn;
437-
MtmRollbackPreparedTransaction(origin_node,msg->gid);
438444
standalone= true;
439445
break;
440446
}
@@ -611,9 +617,6 @@ process_remote_commit(StringInfo in)
611617
origin_lsn=pq_getmsgint64(in);
612618

613619
replorigin_session_origin_lsn=origin_node==MtmReplicationNodeId ?end_lsn :origin_lsn;
614-
if (Mtm->nodes[origin_node-1].restartLSN<replorigin_session_origin_lsn) {
615-
Mtm->nodes[origin_node-1].restartLSN=replorigin_session_origin_lsn;
616-
}
617620
Assert(replorigin_session_origin==InvalidRepOriginId);
618621

619622
switch(PGLOGICAL_XACT_EVENT(flags))

‎pglogical_receiver.c

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,8 @@ pglogical_receiver_main(Datum main_arg)
260260
{
261261
intcount;
262262
ConnStatusTypestatus;
263-
XLogRecPtroriginStartPos=InvalidXLogRecPtr;
263+
XLogRecPtroriginStartPos=Mtm->nodes[nodeId-1].restartLSN;
264264
inttimeline;
265-
boolnewTimeline= false;
266265

267266
/*
268267
* Determine when and how we should open replication slot.
@@ -291,12 +290,12 @@ pglogical_receiver_main(Datum main_arg)
291290
if ((mode==REPLMODE_OPEN_EXISTED&&timeline!=Mtm->nodes[nodeId-1].timeline)
292291
||mode==REPLMODE_CREATE_NEW)
293292
{/* recreate slot */
293+
elog(LOG,"Recreate replication slot %s",slotName);
294294
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
295295
res=PQexec(conn,query->data);
296296
PQclear(res);
297297
resetPQExpBuffer(query);
298298
timeline=Mtm->nodes[nodeId-1].timeline;
299-
newTimeline= true;
300299
}
301300
/* My original assumption was that we can perfrom recovery only from existed slot,
302301
* but unfortunately looks like slots can "disapear" together with WAL-sender.
@@ -322,11 +321,7 @@ pglogical_receiver_main(Datum main_arg)
322321
}
323322

324323
/* Start logical replication at specified position */
325-
if (mode==REPLMODE_RECOVERED) {
326-
originStartPos=Mtm->nodes[nodeId-1].restartLSN;
327-
MTM_LOG1("Restart replication from node %d from position %lx",nodeId,originStartPos);
328-
}
329-
if (originStartPos==InvalidXLogRecPtr&& !newTimeline) {
324+
if (originStartPos==InvalidXLogRecPtr) {
330325
StartTransactionCommand();
331326
originName=psprintf(MULTIMASTER_SLOT_PATTERN,nodeId);
332327
originId=replorigin_by_name(originName, true);
@@ -349,10 +344,11 @@ pglogical_receiver_main(Datum main_arg)
349344
}
350345
Mtm->nodes[nodeId-1].originId=originId;
351346
CommitTransactionCommand();
352-
}elseif (mode==REPLMODE_CREATE_NEW) {
353-
originStartPos=Mtm->nodes[nodeId-1].recoveredLSN;
354-
}
347+
}
355348

349+
MTM_LOG1("Start replication on slot %s from node %d at position %lx, mode %s, recovered lsn %lx",
350+
slotName,nodeId,originStartPos,MtmReplicationModeName[mode],Mtm->recoveredLSN);
351+
356352
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx', \"mtm_recovered_pos\" '%lx')",
357353
slotName,
358354
(uint32) (originStartPos >>32),
@@ -409,13 +405,16 @@ pglogical_receiver_main(Datum main_arg)
409405
ereport(LOG, (errmsg("%s: restart WAL receiver because node was switched to %s mode",worker_proc,MtmNodeStatusMnem[Mtm->status])));
410406
break;
411407
}
412-
if (count!=Mtm->recoveryCount) {
413-
408+
if (count!=Mtm->recoveryCount) {
414409
ereport(LOG, (errmsg("%s: restart WAL receiver because node was recovered",worker_proc)));
415410
break;
416411
}
417412

418-
413+
if (timeline!=Mtm->nodes[nodeId-1].timeline) {
414+
ereport(LOG, (errmsg("%s: restart WAL receiver because node %d timeline is changed",worker_proc,nodeId)));
415+
break;
416+
}
417+
419418
/*
420419
* Receive data.
421420
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp