Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7d99788

Browse files
committed
Add loging of filtered transactions in logical decoder
1 parentb958316 commit7d99788

File tree

5 files changed

+105
-55
lines changed

5 files changed

+105
-55
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ typedef struct {
8383
boolisReplicated;/* transaction on replica */
8484
boolisDistributed;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
8585
boolisPrepared;/* transaction is perpared at first stage of 2PC */
86+
boolisSuspended;/* prepared transaction is suspended because coordinator node is switch to offline */
8687
boolisTransactionBlock;/* is transaction block */
8788
boolcontainsDML;/* transaction contains DML statements */
8889
XidStatusstatus;/* transaction status */
@@ -712,7 +713,7 @@ MtmXactCallback(XactEvent event, void *arg)
712713
}
713714

714715
/*
715-
* Check if this is "normal" usertrnsaction which should be distributed to other nodes
716+
* Check if this is "normal" usertransaction which should be distributed to other nodes
716717
*/
717718
staticbool
718719
MtmIsUserTransaction()
@@ -734,6 +735,7 @@ MtmResetTransaction()
734735
x->gtid.xid=InvalidTransactionId;
735736
x->isDistributed= false;
736737
x->isPrepared= false;
738+
x->isSuspended= false;
737739
x->isTwoPhase= false;
738740
x->csn=
739741
x->status=TRANSACTION_STATUS_UNKNOWN;
@@ -763,6 +765,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
763765
x->isReplicated=MtmIsLogicalReceiver;
764766
x->isDistributed=MtmIsUserTransaction();
765767
x->isPrepared= false;
768+
x->isSuspended= false;
766769
x->isTwoPhase= false;
767770
x->isTransactionBlock=IsTransactionBlock();
768771
/* Application name can be changed usnig PGAPPNAME environment variable */
@@ -1004,14 +1007,18 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10041007
}
10051008
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
10061009
if (ts->isPrepared) {
1007-
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1008-
}
1009-
if (Mtm->status!=MTM_ONLINE) {
1010-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1011-
}else {
1012-
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1010+
// GetNewTransactionId(false); /* force increment of transaction counter */
1011+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1012+
elog(WARNING,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1013+
x->isSuspended= true;
1014+
}else {
1015+
if (Mtm->status!=MTM_ONLINE) {
1016+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1017+
}else {
1018+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1019+
}
1020+
MtmAbortTransaction(ts);
10131021
}
1014-
MtmAbortTransaction(ts);
10151022
}
10161023
x->status=ts->status;
10171024
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
@@ -1078,14 +1085,18 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10781085
}
10791086
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
10801087
if (ts->isPrepared) {
1081-
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1082-
}
1083-
if (Mtm->status!=MTM_ONLINE) {
1084-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1085-
}else {
1086-
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1088+
// GetNewTransactionId(false); /* force increment of transaction counter */
1089+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1090+
elog(WARNING,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1091+
x->isSuspended= true;
1092+
}else {
1093+
if (Mtm->status!=MTM_ONLINE) {
1094+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1095+
}else {
1096+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1097+
}
1098+
MtmAbortTransaction(ts);
10871099
}
1088-
MtmAbortTransaction(ts);
10891100
}
10901101
x->status=ts->status;
10911102
x->xid=ts->xid;
@@ -1293,6 +1304,7 @@ static void MtmStartRecovery()
12931304
MtmLock(LW_EXCLUSIVE);
12941305
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
12951306
MtmSwitchClusterMode(MTM_RECOVERY);
1307+
Mtm->recoveredLSN=InvalidXLogRecPtr;
12961308
MtmUnlock();
12971309
}
12981310

@@ -1604,6 +1616,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
16041616
MTM_LOG1("%d: node %d is caugth-up without locking cluster",MyProcPid,nodeId);
16051617
/* We are lucky: caugth-up without locking cluster! */
16061618
}
1619+
Mtm->recoveredLSN=walLSN;
16071620
MtmEnableNode(nodeId);
16081621
Mtm->nConfigChanges+=1;
16091622
caughtUp= true;
@@ -2075,6 +2088,7 @@ static void MtmInitialize()
20752088
Mtm->walSenderLockerMask=0;
20762089
Mtm->nodeLockerMask=0;
20772090
Mtm->reconnectMask=0;
2091+
Mtm->recoveredLSN=InvalidXLogRecPtr;
20782092
Mtm->nLockers=0;
20792093
Mtm->nActiveTransactions=0;
20802094
Mtm->votingTransactions=NULL;
@@ -2102,13 +2116,14 @@ static void MtmInitialize()
21022116
Mtm->nodes[i].con=MtmConnections[i];
21032117
Mtm->nodes[i].flushPos=0;
21042118
Mtm->nodes[i].lastHeartbeat=0;
2105-
Mtm->nodes[i].restartLsn=0;
2119+
Mtm->nodes[i].restartLSN=InvalidXLogRecPtr;
21062120
Mtm->nodes[i].originId=InvalidRepOriginId;
21072121
Mtm->nodes[i].timeline=0;
2122+
Mtm->nodes[i].recoveredLSN=InvalidXLogRecPtr;
21082123
}
21092124
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
21102125
/* All transaction originated from the current node should be ignored during recovery */
2111-
Mtm->nodes[MtmNodeId-1].restartLsn= (XLogRecPtr)PG_UINT64_MAX;
2126+
Mtm->nodes[MtmNodeId-1].restartLSN= (XLogRecPtr)PG_UINT64_MAX;
21122127
PGSemaphoreCreate(&Mtm->sendSemaphore);
21132128
PGSemaphoreReset(&Mtm->sendSemaphore);
21142129
SpinLockInit(&Mtm->spinlock);
@@ -2850,18 +2865,21 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28502865
*/
28512866
MtmReplicationModeMtmGetReplicationMode(intnodeId,sig_atomic_tvolatile*shutdown)
28522867
{
2853-
boolrecovery=false;
2868+
MtmReplicationModemode=REPLMODE_OPEN_EXISTED;
28542869

2855-
while (Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)
2870+
while ((Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)||BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
28562871
{
28572872
if (*shutdown)
28582873
{
28592874
returnREPLMODE_EXIT;
28602875
}
2861-
MTM_LOG2("%d: receiver slot mode %s",MyProcPid,MtmNodeStatusMnem[Mtm->status]);
28622876
MtmLock(LW_EXCLUSIVE);
2877+
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
2878+
mode=REPLMODE_CREATE_NEW;
2879+
}
2880+
MTM_LOG2("%d: receiver slot mode %s",MyProcPid,MtmNodeStatusMnem[Mtm->status]);
28632881
if (Mtm->status==MTM_RECOVERY) {
2864-
recovery=true;
2882+
mode=REPLMODE_RECOVERED;
28652883
if ((Mtm->recoverySlot==0&& (Mtm->donorNodeId==MtmNodeId||Mtm->donorNodeId==nodeId))
28662884
||Mtm->recoverySlot==nodeId)
28672885
{
@@ -2879,13 +2897,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28792897
/* delay opening of other slots until recovery is completed */
28802898
MtmSleep(STATUS_POLL_DELAY);
28812899
}
2882-
if (recovery) {
2900+
if (mode==REPLMODE_RECOVERED) {
28832901
MTM_LOG1("%d: Restart replication from node %d after end of recovery",MyProcPid,nodeId);
2902+
}elseif (mode==REPLMODE_CREATE_NEW) {
2903+
MTM_LOG1("%d: Start replication from recovered node %d",MyProcPid,nodeId);
28842904
}else {
28852905
MTM_LOG1("%d: Continue replication from node %d",MyProcPid,nodeId);
28862906
}
2887-
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2888-
returnrecovery ?REPLMODE_RECOVERED :REPLMODE_NORMAL;
2907+
returnmode;
28892908
}
28902909

28912910
staticboolMtmIsBroadcast()
@@ -2964,7 +2983,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29642983
MtmIsRecoverySession= true;
29652984
}elseif (strcmp(strVal(elem->arg),"recovered")==0) {
29662985
recoveryCompleted= true;
2967-
}elseif (strcmp(strVal(elem->arg),"normal")!=0) {
2986+
}elseif (strcmp(strVal(elem->arg),"open_existed")!=0&&strcmp(strVal(elem->arg),"create_new")!=0) {
29682987
elog(ERROR,"Illegal recovery mode %s",strVal(elem->arg));
29692988
}
29702989
}else {
@@ -2976,14 +2995,20 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29762995
}else {
29772996
elog(ERROR,"Restart position is not specified");
29782997
}
2998+
}elseif (strcmp("mtm_recovered_pos",elem->defname)==0) {
2999+
if (elem->arg!=NULL&&strVal(elem->arg)!=NULL) {
3000+
sscanf(strVal(elem->arg),"%lx",&Mtm->nodes[MtmReplicationNodeId-1].recoveredLSN);
3001+
}else {
3002+
elog(ERROR,"Recovered position is not specified");
3003+
}
29793004
}
29803005
}
29813006
MtmLock(LW_EXCLUSIVE);
29823007
if (MtmIsRecoverySession) {
29833008
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx",MyProcPid,MtmNodeId,MtmReplicationNodeId,recoveryStartPos);
29843009
Assert(MyReplicationSlot!=NULL);
29853010
if (recoveryStartPos<MyReplicationSlot->data.restart_lsn) {
2986-
elog(ERROR,"Specified recovery start position %lx is beyond restart lsn %lx",recoveryStartPos,MyReplicationSlot->data.restart_lsn);
3011+
elog(WARNING,"Specified recovery start position %lx is beyond restart lsn %lx",recoveryStartPos,MyReplicationSlot->data.restart_lsn);
29873012
}
29883013
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
29893014
MtmDisableNode(MtmReplicationNodeId);
@@ -3132,17 +3157,17 @@ bool MtmFilterTransaction(char* record, int size)
31323157
default:
31333158
break;
31343159
}
3135-
duplicate=Mtm->status==MTM_RECOVERY&&origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLsn;
3160+
duplicate=Mtm->status==MTM_RECOVERY&&origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLSN;
31363161

31373162
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3138-
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLsn);
3163+
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
31393164
if (Mtm->status==MTM_RECOVERY) {
3140-
if (Mtm->nodes[origin_node-1].restartLsn<origin_lsn) {
3141-
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
3165+
if (Mtm->nodes[origin_node-1].restartLSN<origin_lsn) {
3166+
Mtm->nodes[origin_node-1].restartLSN=origin_lsn;
31423167
}
31433168
}else {
3144-
if (Mtm->nodes[replication_node-1].restartLsn<end_lsn) {
3145-
Mtm->nodes[replication_node-1].restartLsn=end_lsn;
3169+
if (Mtm->nodes[replication_node-1].restartLSN<end_lsn) {
3170+
Mtm->nodes[replication_node-1].restartLSN=end_lsn;
31463171
}
31473172
}
31483173
returnduplicate;
@@ -3757,12 +3782,16 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
37573782
/* ??? Should we do explicit rollback */
37583783
}else {
37593784
CommitTransactionCommand();
3760-
StartTransactionCommand();
3761-
if (MtmGetCurrentTransactionStatus()==TRANSACTION_STATUS_ABORTED) {
3762-
FinishPreparedTransaction(x->gid, false);
3763-
elog(ERROR,"Transaction %s is aborted by DTM",x->gid);
3764-
}else {
3765-
FinishPreparedTransaction(x->gid, true);
3785+
if (x->isSuspended) {
3786+
elog(WARNING,"Transaction %s is left in prepared state because coordinator onde is not online",x->gid);
3787+
}else {
3788+
StartTransactionCommand();
3789+
if (x->status==TRANSACTION_STATUS_ABORTED) {
3790+
FinishPreparedTransaction(x->gid, false);
3791+
elog(ERROR,"Transaction %s is aborted by DTM",x->gid);
3792+
}else {
3793+
FinishPreparedTransaction(x->gid, true);
3794+
}
37663795
}
37673796
}
37683797
return true;

‎contrib/mmts/multimaster.h‎

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,11 @@ typedef enum
126126

127127
typedefenum
128128
{
129-
REPLMODE_EXIT,/* receiver should exit */
130-
REPLMODE_RECOVERED,/* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
131-
REPLMODE_RECOVERY,/* perform recorvery of the node by applying all data from the slot from specified point */
132-
REPLMODE_NORMAL/* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
129+
REPLMODE_EXIT,/* receiver should exit */
130+
REPLMODE_RECOVERED,/* recovery of receiver node is completed so drop old slot and restart replication from the current position in WAL */
131+
REPLMODE_RECOVERY,/* perform recorvery of the node by applying all data from the slot from specified point */
132+
REPLMODE_CREATE_NEW,/* destination node is recovered: drop old slot and restart from roveredLsn position */
133+
REPLMODE_OPEN_EXISTED/* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
133134
}MtmReplicationMode;
134135

135136
typedefstruct
@@ -188,12 +189,13 @@ typedef struct
188189
intreceiverPid;
189190
XLogRecPtrflushPos;
190191
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
191-
XLogRecPtrrestartLsn;
192+
XLogRecPtrrestartLSN;
192193
RepOriginIdoriginId;
193194
inttimeline;
194195
void*lockGraphData;
195196
intlockGraphAllocated;
196197
intlockGraphUsed;
198+
XLogRecPtrrecoveredLSN;
197199
}MtmNodeInfo;
198200

199201
typedefstructMtmTransState
@@ -264,6 +266,7 @@ typedef struct
264266
uint64gcCount;/* Number of global transactions performed since last GC */
265267
MtmMessageQueue*sendQueue;/* Messages to be sent by arbiter sender */
266268
MtmMessageQueue*freeQueue;/* Free messages */
269+
XLogRecPtrrecoveredLSN;/* LSN at the moment of recovery completion */
267270
BgwPoolpool;/* Pool of background workers for applying logical replication patches */
268271
MtmNodeInfonodes[1];/* [Mtm->nAllNodes]: per-node data */
269272
}MtmState;

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187187
Assert(false);
188188

189189
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
190-
Assert(txn->xid<1000||MtmTransactionRecords>=2);
190+
//Assert(txn->xid < 1000 || MtmTransactionRecords!= 1);
191191
// if (MtmIsFilteredTxn) {
192192
// Assert(MtmTransactionRecords == 0);
193193
// return;

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,10 @@ feTimestampDifference(int64 start_time, int64 stop_time,
196196
staticcharconst*constMtmReplicationModeName[]=
197197
{
198198
"exit",
199-
"recovered",/* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
200-
"recovery",/* perform recorvery of the node by applying all data from theslot from specified point */
201-
"normal"/* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
199+
"recovered",/* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
200+
"recovery",/* perform recorvery of the node by applying all data from theslot from specified point */
201+
"create_new",/* destination node is recovered: drop old slot and restart from roveredLsn position */
202+
"open_existed"/* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
202203
};
203204

204205
staticvoid
@@ -275,7 +276,7 @@ pglogical_receiver_main(Datum main_arg)
275276
}
276277
timeline=Mtm->nodes[nodeId-1].timeline;
277278
count=Mtm->recoveryCount;
278-
279+
279280
/* Establish connection to remote server */
280281
conn=PQconnectdb_safe(connString);
281282
status=PQstatus(conn);
@@ -287,7 +288,9 @@ pglogical_receiver_main(Datum main_arg)
287288
}
288289

289290
query=createPQExpBuffer();
290-
if (mode==REPLMODE_NORMAL&&timeline!=Mtm->nodes[nodeId-1].timeline) {/* recreate slot */
291+
if ((mode==REPLMODE_OPEN_EXISTED&&timeline!=Mtm->nodes[nodeId-1].timeline)
292+
||mode==REPLMODE_CREATE_NEW)
293+
{/* recreate slot */
291294
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
292295
res=PQexec(conn,query->data);
293296
PQclear(res);
@@ -320,7 +323,7 @@ pglogical_receiver_main(Datum main_arg)
320323

321324
/* Start logical replication at specified position */
322325
if (mode==REPLMODE_RECOVERED) {
323-
originStartPos=Mtm->nodes[nodeId-1].restartLsn;
326+
originStartPos=Mtm->nodes[nodeId-1].restartLSN;
324327
MTM_LOG1("Restart replication from node %d from position %lx",nodeId,originStartPos);
325328
}
326329
if (originStartPos==InvalidXLogRecPtr&& !newTimeline) {
@@ -339,23 +342,26 @@ pglogical_receiver_main(Datum main_arg)
339342
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
340343
}else {
341344
originStartPos=replorigin_get_progress(originId, false);
342-
if (Mtm->nodes[nodeId-1].restartLsn<originStartPos) {
343-
Mtm->nodes[nodeId-1].restartLsn=originStartPos;
345+
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
346+
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
344347
}
345348
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
346349
}
347350
Mtm->nodes[nodeId-1].originId=originId;
348351
CommitTransactionCommand();
352+
}elseif (mode==REPLMODE_CREATE_NEW) {
353+
originStartPos=Mtm->nodes[nodeId-1].recoveredLSN;
349354
}
350355

351-
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx')",
356+
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx', \"mtm_recovered_pos\" '%lx')",
352357
slotName,
353358
(uint32) (originStartPos >>32),
354359
(uint32)originStartPos,
355360
MULTIMASTER_MAX_PROTO_VERSION,
356361
MULTIMASTER_MIN_PROTO_VERSION,
357362
MtmReplicationModeName[mode],
358-
originStartPos
363+
originStartPos,
364+
Mtm->recoveredLSN
359365
);
360366
res=PQexec(conn,query->data);
361367
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
@@ -511,7 +517,7 @@ pglogical_receiver_main(Datum main_arg)
511517
output_written_lsn=Max(walEnd,output_written_lsn);
512518
continue;
513519
}
514-
mode=REPLMODE_NORMAL;
520+
mode=REPLMODE_OPEN_EXISTED;
515521
}
516522
MTM_LOG3("%ld: Receive message %c from node %d",MtmGetSystemTime(),stmt[0],nodeId);
517523
if (buf.used >=MtmTransSpillThreshold*MB) {

‎src/backend/replication/logical/decode.c‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,18 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
606606
(parsed->dbId!=InvalidOid&&parsed->dbId!=ctx->slot->data.database)||
607607
FilterByOrigin(ctx,origin_id))
608608
{
609+
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder,buf->origptr)) {
610+
elog(LOG,"Skip transaction %d at %lx because it's origptr %lx is not in snapshot",
611+
xid,buf->endptr,buf->origptr);
612+
}
613+
if (parsed->dbId!=InvalidOid&&parsed->dbId!=ctx->slot->data.database) {
614+
elog(LOG,"Skip transaction %d at %lx because database id is not matched",
615+
xid,buf->endptr);
616+
}
617+
if (FilterByOrigin(ctx,origin_id)) {
618+
elog(LOG,"Skip transaction %d at %lx is filtered by origin_id %d",
619+
xid,buf->endptr,origin_id);
620+
}
609621
for (i=0;i<parsed->nsubxacts;i++)
610622
{
611623
ReorderBufferForget(ctx->reorder,parsed->subxacts[i],buf->origptr);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp