Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit35aec67

Browse files
committed
Restore skip local transaction mechanism in pglogical_proto
1 parentbbefe45 commit35aec67

File tree

4 files changed

+59
-55
lines changed

4 files changed

+59
-55
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -902,14 +902,13 @@ static void MtmReceiver(Datum arg)
902902
msg->status=TRANSACTION_STATUS_ABORTED;
903903
}else {
904904
msg->status=tm->state->status;
905-
msg->csn=tm->state->csn;
905+
msg->csn=tm->state->csn;
906906
MTM_LOG1("Send response %d for transaction %s to node %d",msg->status,msg->gid,msg->node);
907907
}
908908
msg->disabledNodeMask=Mtm->disabledNodeMask;
909909
msg->connectivityMask=Mtm->connectivityMask;
910910
msg->oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
911911
msg->code=MSG_POLL_STATUS;
912-
msg->csn=ts->csn;
913912
MtmSendMessage(msg);
914913
continue;
915914
caseMSG_POLL_STATUS:
@@ -922,11 +921,11 @@ static void MtmReceiver(Datum arg)
922921
BIT_SET(ts->votedMask,node-1);
923922
if (ts->status==TRANSACTION_STATUS_UNKNOWN) {
924923
if (msg->status==TRANSACTION_STATUS_IN_PROGRESS||msg->status==TRANSACTION_STATUS_ABORTED) {
925-
elog(LOG,"Abort transaction %s because it is in state %d at node %d",
926-
msg->gid,ts->status,node);
924+
elog(LOG,"Abortpreparedtransaction %s because it is in state %s at node %d",
925+
msg->gid,MtmNodeStatusMnem[msg->status],node);
927926
MtmFinishPreparedTransaction(ts, false);
928927
}
929-
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
928+
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
930929
{
931930
if (msg->csn>ts->csn) {
932931
ts->csn=msg->csn;
@@ -937,17 +936,17 @@ static void MtmReceiver(Datum arg)
937936
MtmFinishPreparedTransaction(ts, true);
938937
}
939938
}else {
940-
elog(LOG,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
941-
msg->status,msg->gid,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
939+
elog(LOG,"Receive response %s for transaction %s for node %d, votedMask%llx, participantsMask%llx",
940+
MtmNodeStatusMnem[msg->status],msg->gid,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
942941
continue;
943942
}
944943
}elseif (ts->status==TRANSACTION_STATUS_ABORTED&&msg->status==TRANSACTION_STATUS_COMMITTED) {
945944
elog(WARNING,"Transaction %s is aborted at node %d but committed at node %d",msg->gid,MtmNodeId,node);
946945
}elseif (msg->status==TRANSACTION_STATUS_ABORTED&&ts->status==TRANSACTION_STATUS_COMMITTED) {
947946
elog(WARNING,"Transaction %s is committed at node %d but aborted at node %d",msg->gid,MtmNodeId,node);
948947
}else {
949-
elog(LOG,"Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx",
950-
msg->status,msg->gid,ts->status,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask) );
948+
elog(LOG,"Receive response %s for transaction %s status %s for node %d, votedMask%llx, participantsMask%llx",
949+
MtmNodeStatusMnem[msg->status],msg->gid,MtmNodeStatusMnem[ts->status],node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask) );
951950
}
952951
}
953952
continue;
@@ -987,8 +986,8 @@ static void MtmReceiver(Datum arg)
987986
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
988987
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
989988
commit on smaller subset of nodes */
990-
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
991-
node, (long)Mtm->disabledNodeMask, (long)msg->disabledNodeMask);
989+
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %llx instead of %llx",
990+
node, (long long)Mtm->disabledNodeMask, (longlong)msg->disabledNodeMask);
992991
MtmAbortTransaction(ts);
993992
}
994993
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
@@ -997,7 +996,7 @@ static void MtmReceiver(Datum arg)
997996
MtmWakeUpBackend(ts);
998997
}else {
999998
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1000-
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
999+
MTM_LOG2("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
10011000
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
10021001
ts->isPrepared= true;
10031002
if (ts->isTwoPhase) {

‎contrib/mmts/multimaster.c‎

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,17 +1154,18 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11541154
if (x->isPrepared) {
11551155
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
11561156
Assert(ts!=NULL);
1157+
Assert(strcmp(x->gid,ts->gid)==0);
11571158
}elseif (x->gid[0]) {
11581159
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_FIND,NULL);
11591160
if (tm!=NULL) {
11601161
ts=tm->state;
11611162
}else {
1162-
MTM_LOG3("%d: GID %s not found",MyProcPid,x->gid);
1163+
MTM_LOG1("%d: GID %s not found",MyProcPid,x->gid);
11631164
}
11641165
}
11651166
if (ts!=NULL) {
11661167
if (*ts->gid)
1167-
MTM_LOG1("TRANSLOG: %s transactiongit=%s xid=%d node=%d dxid=%d status %d",
1168+
MTM_LOG2("TRANSLOG: %s transactiongid=%s xid=%d node=%d dxid=%d status %d",
11681169
(commit ?"commit" :"rollback"),ts->gid,ts->xid,ts->gtid.node,ts->gtid.xid,ts->status);
11691170
if (commit) {
11701171
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
@@ -1177,6 +1178,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11771178
ts->xid,ts->gid);
11781179
}
11791180
if (x->csn>ts->csn||Mtm->status==MTM_RECOVERY) {
1181+
Assert(x->csn!=INVALID_CSN);
11801182
ts->csn=x->csn;
11811183
MtmSyncClock(ts->csn);
11821184
}
@@ -2842,7 +2844,7 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28422844
XidStatusstatus=MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED);
28432845
MTM_LOG1("Abort prepared transaction %s status %d",gid,status);
28442846
if (status==TRANSACTION_STATUS_UNKNOWN) {
2845-
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
2847+
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
28462848
MtmResetTransaction();
28472849
StartTransactionCommand();
28482850
MtmBeginSession(nodeId);
@@ -3191,9 +3193,7 @@ bool MtmFilterTransaction(char* record, int size)
31913193
duplicate= true;
31923194
}
31933195

3194-
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3195-
3196-
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3196+
MTM_LOG2("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
31973197
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
31983198
returnduplicate;
31993199
}

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ process_remote_commit(StringInfo in)
641641
AbortCurrentTransaction();
642642
}else {
643643
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
644-
MTM_LOG1("PGLOGICAL_PREPARE commit: gid=%s",gid);
644+
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s",gid);
645645
BeginTransactionBlock();
646646
CommitTransactionCommand();
647647
StartTransactionCommand();
@@ -668,7 +668,7 @@ process_remote_commit(StringInfo in)
668668
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
669669
csn=pq_getmsgint64(in);
670670
gid=pq_getmsgstring(in);
671-
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%lx",csn,gid,end_lsn);
671+
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%lx",csn,gid,end_lsn);
672672
MtmResetTransaction();
673673
StartTransactionCommand();
674674
MtmBeginSession(origin_node);

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@
3939
#include"pglogical_relid_map.h"
4040

4141
staticintMtmTransactionRecords;
42+
staticboolMtmIsFilteredTxn;
4243
staticTransactionIdMtmCurrentXid;
43-
staticboolDDLInProress= false;
44+
staticboolDDLInProgress= false;
4445

4546
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4647

@@ -76,13 +77,13 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7677
uint8relnamelen;
7778
Oidrelid;
7879

79-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN) {
80+
if (MtmIsFilteredTxn) {
8081
MTM_LOG2("%d: pglogical_write_message filtered",MyProcPid);
8182
return;
8283
}
8384

84-
if (DDLInProress) {
85-
MTM_LOG2("%d: pglogical_write_message filteredDDLInProress",MyProcPid);
85+
if (DDLInProgress) {
86+
MTM_LOG2("%d: pglogical_write_message filteredDDLInProgress",MyProcPid);
8687
return;
8788
}
8889

@@ -116,17 +117,22 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
116117
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
117118
csn_tcsn=MtmTransactionSnapshot(txn->xid);
118119

119-
MtmCurrentXid=txn->xid;
120-
121-
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
122-
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
123-
124-
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
125-
pq_sendbyte(out,'B');/* BEGIN */
126-
pq_sendint(out,MtmNodeId,4);
127-
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
128-
pq_sendint64(out,csn);
129-
MtmTransactionRecords=0;
120+
if (!isRecovery&&csn==INVALID_CSN) {
121+
MtmIsFilteredTxn= true;
122+
MTM_LOG3("%d: pglogical_write_begin XID=%d filtered",MyProcPid,txn->xid);
123+
}else {
124+
MtmCurrentXid=txn->xid;
125+
MtmIsFilteredTxn= false;
126+
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
127+
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
128+
129+
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
130+
pq_sendbyte(out,'B');/* BEGIN */
131+
pq_sendint(out,MtmNodeId,4);
132+
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
133+
pq_sendint64(out,csn);
134+
MtmTransactionRecords=0;
135+
}
130136
}
131137

132138
staticvoid
@@ -142,14 +148,14 @@ pglogical_write_message(StringInfo out,
142148
}
143149
break;
144150
case'D':
145-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
151+
if (MtmIsFilteredTxn){
146152
MTM_LOG2("%d: pglogical_write_message filtered",MyProcPid);
147153
return;
148154
}
149-
DDLInProress= true;
155+
DDLInProgress= true;
150156
break;
151157
case'E':
152-
DDLInProress= false;
158+
DDLInProgress= false;
153159
/*
154160
* we use End message only as indicator of DDL transaction finish,
155161
* so no need to send that to replicas.
@@ -187,11 +193,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187193
Assert(false);
188194

189195
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
190-
//Assert(txn->xid < 1000 || MtmTransactionRecords != 1);
191-
// if (MtmIsFilteredTxn) {
192-
// Assert(MtmTransactionRecords == 0);
193-
// return;
194-
// }
196+
if (MtmIsFilteredTxn) {
197+
Assert(MtmTransactionRecords==0);
198+
return;
199+
}
195200
}else {
196201
csn_tcsn=MtmTransactionSnapshot(txn->xid);
197202
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
@@ -261,13 +266,13 @@ static void
261266
pglogical_write_insert(StringInfoout,PGLogicalOutputData*data,
262267
Relationrel,HeapTuplenewtuple)
263268
{
264-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
269+
if (MtmIsFilteredTxn){
265270
MTM_LOG2("%d: pglogical_write_insert filtered",MyProcPid);
266271
return;
267272
}
268273

269-
if (DDLInProress) {
270-
MTM_LOG2("%d: pglogical_write_insert filteredDDLInProress",MyProcPid);
274+
if (DDLInProgress) {
275+
MTM_LOG2("%d: pglogical_write_insert filteredDDLInProgress",MyProcPid);
271276
return;
272277
}
273278

@@ -284,13 +289,13 @@ static void
284289
pglogical_write_update(StringInfoout,PGLogicalOutputData*data,
285290
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
286291
{
287-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
292+
if (MtmIsFilteredTxn){
288293
MTM_LOG2("%d: pglogical_write_update filtered",MyProcPid);
289294
return;
290295
}
291296

292-
if (DDLInProress) {
293-
MTM_LOG2("%d: pglogical_write_update filteredDDLInProress",MyProcPid);
297+
if (DDLInProgress) {
298+
MTM_LOG2("%d: pglogical_write_update filteredDDLInProgress",MyProcPid);
294299
return;
295300
}
296301

@@ -317,13 +322,13 @@ static void
317322
pglogical_write_delete(StringInfoout,PGLogicalOutputData*data,
318323
Relationrel,HeapTupleoldtuple)
319324
{
320-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
325+
if (MtmIsFilteredTxn){
321326
MTM_LOG2("%d: pglogical_write_delete filtered",MyProcPid);
322327
return;
323328
}
324329

325-
if (DDLInProress) {
326-
MTM_LOG2("%d: pglogical_write_delete filteredDDLInProress",MyProcPid);
330+
if (DDLInProgress) {
331+
MTM_LOG2("%d: pglogical_write_delete filteredDDLInProgress",MyProcPid);
327332
return;
328333
}
329334

@@ -354,13 +359,13 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
354359
inti;
355360
uint16nliveatts=0;
356361

357-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
362+
if (MtmIsFilteredTxn){
358363
MTM_LOG2("%d: pglogical_write_tuple filtered",MyProcPid);
359364
return;
360365
}
361366

362-
if (DDLInProress) {
363-
MTM_LOG2("%d: pglogical_write_tuple filteredDDLInProress",MyProcPid);
367+
if (DDLInProgress) {
368+
MTM_LOG2("%d: pglogical_write_tuple filteredDDLInProgress",MyProcPid);
364369
return;
365370
}
366371

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp