Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaf5645e

Browse files
knizhnikkelvich
authored andcommitted
Restore skip local transaction mechanism in pglogical_proto
1 parentc39b549 commitaf5645e

File tree

4 files changed

+59
-55
lines changed

4 files changed

+59
-55
lines changed

‎arbiter.c

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -898,14 +898,13 @@ static void MtmReceiver(Datum arg)
898898
msg->status=TRANSACTION_STATUS_ABORTED;
899899
}else {
900900
msg->status=tm->state->status;
901-
msg->csn=tm->state->csn;
901+
msg->csn=tm->state->csn;
902902
MTM_LOG1("Send response %d for transaction %s to node %d",msg->status,msg->gid,msg->node);
903903
}
904904
msg->disabledNodeMask=Mtm->disabledNodeMask;
905905
msg->connectivityMask=Mtm->connectivityMask;
906906
msg->oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
907907
msg->code=MSG_POLL_STATUS;
908-
msg->csn=ts->csn;
909908
MtmSendMessage(msg);
910909
continue;
911910
caseMSG_POLL_STATUS:
@@ -918,11 +917,11 @@ static void MtmReceiver(Datum arg)
918917
BIT_SET(ts->votedMask,node-1);
919918
if (ts->status==TRANSACTION_STATUS_UNKNOWN) {
920919
if (msg->status==TRANSACTION_STATUS_IN_PROGRESS||msg->status==TRANSACTION_STATUS_ABORTED) {
921-
elog(LOG,"Abort transaction %s because it is in state %d at node %d",
922-
msg->gid,ts->status,node);
920+
elog(LOG,"Abortpreparedtransaction %s because it is in state %s at node %d",
921+
msg->gid,MtmNodeStatusMnem[msg->status],node);
923922
MtmFinishPreparedTransaction(ts, false);
924923
}
925-
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
924+
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
926925
{
927926
if (msg->csn>ts->csn) {
928927
ts->csn=msg->csn;
@@ -933,17 +932,17 @@ static void MtmReceiver(Datum arg)
933932
MtmFinishPreparedTransaction(ts, true);
934933
}
935934
}else {
936-
elog(LOG,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
937-
msg->status,msg->gid,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
935+
elog(LOG,"Receive response %s for transaction %s for node %d, votedMask%llx, participantsMask%llx",
936+
MtmNodeStatusMnem[msg->status],msg->gid,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
938937
continue;
939938
}
940939
}elseif (ts->status==TRANSACTION_STATUS_ABORTED&&msg->status==TRANSACTION_STATUS_COMMITTED) {
941940
elog(WARNING,"Transaction %s is aborted at node %d but committed at node %d",msg->gid,MtmNodeId,node);
942941
}elseif (msg->status==TRANSACTION_STATUS_ABORTED&&ts->status==TRANSACTION_STATUS_COMMITTED) {
943942
elog(WARNING,"Transaction %s is committed at node %d but aborted at node %d",msg->gid,MtmNodeId,node);
944943
}else {
945-
elog(LOG,"Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx",
946-
msg->status,msg->gid,ts->status,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask) );
944+
elog(LOG,"Receive response %s for transaction %s status %s for node %d, votedMask%llx, participantsMask%llx",
945+
MtmNodeStatusMnem[msg->status],msg->gid,MtmNodeStatusMnem[ts->status],node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask) );
947946
}
948947
}
949948
continue;
@@ -983,8 +982,8 @@ static void MtmReceiver(Datum arg)
983982
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
984983
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
985984
commit on smaller subset of nodes */
986-
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
987-
node, (long)Mtm->disabledNodeMask, (long)msg->disabledNodeMask);
985+
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %llx instead of %llx",
986+
node, (long long)Mtm->disabledNodeMask, (longlong)msg->disabledNodeMask);
988987
MtmAbortTransaction(ts);
989988
}
990989
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
@@ -993,7 +992,7 @@ static void MtmReceiver(Datum arg)
993992
MtmWakeUpBackend(ts);
994993
}else {
995994
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
996-
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
995+
MTM_LOG2("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
997996
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
998997
ts->isPrepared= true;
999998
if (ts->isTwoPhase) {

‎multimaster.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,17 +1154,18 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11541154
if (x->isPrepared) {
11551155
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
11561156
Assert(ts!=NULL);
1157+
Assert(strcmp(x->gid,ts->gid)==0);
11571158
}elseif (x->gid[0]) {
11581159
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_FIND,NULL);
11591160
if (tm!=NULL) {
11601161
ts=tm->state;
11611162
}else {
1162-
MTM_LOG3("%d: GID %s not found",MyProcPid,x->gid);
1163+
MTM_LOG1("%d: GID %s not found",MyProcPid,x->gid);
11631164
}
11641165
}
11651166
if (ts!=NULL) {
11661167
if (*ts->gid)
1167-
MTM_LOG1("TRANSLOG: %s transactiongit=%s xid=%d node=%d dxid=%d status %d",
1168+
MTM_LOG2("TRANSLOG: %s transactiongid=%s xid=%d node=%d dxid=%d status %d",
11681169
(commit ?"commit" :"rollback"),ts->gid,ts->xid,ts->gtid.node,ts->gtid.xid,ts->status);
11691170
if (commit) {
11701171
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
@@ -1177,6 +1178,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11771178
ts->xid,ts->gid);
11781179
}
11791180
if (x->csn>ts->csn||Mtm->status==MTM_RECOVERY) {
1181+
Assert(x->csn!=INVALID_CSN);
11801182
ts->csn=x->csn;
11811183
MtmSyncClock(ts->csn);
11821184
}
@@ -2844,7 +2846,7 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28442846
XidStatusstatus=MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED);
28452847
MTM_LOG1("Abort prepared transaction %s status %d",gid,status);
28462848
if (status==TRANSACTION_STATUS_UNKNOWN) {
2847-
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
2849+
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
28482850
MtmResetTransaction();
28492851
StartTransactionCommand();
28502852
MtmBeginSession(nodeId);
@@ -3193,9 +3195,7 @@ bool MtmFilterTransaction(char* record, int size)
31933195
duplicate= true;
31943196
}
31953197

3196-
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3197-
3198-
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3198+
MTM_LOG2("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
31993199
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
32003200
returnduplicate;
32013201
}

‎pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ process_remote_commit(StringInfo in)
641641
AbortCurrentTransaction();
642642
}else {
643643
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
644-
MTM_LOG1("PGLOGICAL_PREPARE commit: gid=%s",gid);
644+
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s",gid);
645645
BeginTransactionBlock();
646646
CommitTransactionCommand();
647647
StartTransactionCommand();
@@ -668,7 +668,7 @@ process_remote_commit(StringInfo in)
668668
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
669669
csn=pq_getmsgint64(in);
670670
gid=pq_getmsgstring(in);
671-
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%lx",csn,gid,end_lsn);
671+
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%lx",csn,gid,end_lsn);
672672
MtmResetTransaction();
673673
StartTransactionCommand();
674674
MtmBeginSession(origin_node);

‎pglogical_proto.c

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@
3939
#include"pglogical_relid_map.h"
4040

4141
staticintMtmTransactionRecords;
42+
staticboolMtmIsFilteredTxn;
4243
staticTransactionIdMtmCurrentXid;
43-
staticboolDDLInProress= false;
44+
staticboolDDLInProgress= false;
4445

4546
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4647

@@ -76,13 +77,13 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7677
uint8relnamelen;
7778
Oidrelid;
7879

79-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN) {
80+
if (MtmIsFilteredTxn) {
8081
MTM_LOG2("%d: pglogical_write_message filtered",MyProcPid);
8182
return;
8283
}
8384

84-
if (DDLInProress) {
85-
MTM_LOG2("%d: pglogical_write_message filteredDDLInProress",MyProcPid);
85+
if (DDLInProgress) {
86+
MTM_LOG2("%d: pglogical_write_message filteredDDLInProgress",MyProcPid);
8687
return;
8788
}
8889

@@ -116,17 +117,22 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
116117
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
117118
csn_tcsn=MtmTransactionSnapshot(txn->xid);
118119

119-
MtmCurrentXid=txn->xid;
120-
121-
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
122-
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
123-
124-
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
125-
pq_sendbyte(out,'B');/* BEGIN */
126-
pq_sendint(out,MtmNodeId,4);
127-
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
128-
pq_sendint64(out,csn);
129-
MtmTransactionRecords=0;
120+
if (!isRecovery&&csn==INVALID_CSN) {
121+
MtmIsFilteredTxn= true;
122+
MTM_LOG3("%d: pglogical_write_begin XID=%d filtered",MyProcPid,txn->xid);
123+
}else {
124+
MtmCurrentXid=txn->xid;
125+
MtmIsFilteredTxn= false;
126+
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
127+
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
128+
129+
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
130+
pq_sendbyte(out,'B');/* BEGIN */
131+
pq_sendint(out,MtmNodeId,4);
132+
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
133+
pq_sendint64(out,csn);
134+
MtmTransactionRecords=0;
135+
}
130136
}
131137

132138
staticvoid
@@ -142,14 +148,14 @@ pglogical_write_message(StringInfo out,
142148
}
143149
break;
144150
case'D':
145-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
151+
if (MtmIsFilteredTxn){
146152
MTM_LOG2("%d: pglogical_write_message filtered",MyProcPid);
147153
return;
148154
}
149-
DDLInProress= true;
155+
DDLInProgress= true;
150156
break;
151157
case'E':
152-
DDLInProress= false;
158+
DDLInProgress= false;
153159
/*
154160
* we use End message only as indicator of DDL transaction finish,
155161
* so no need to send that to replicas.
@@ -187,11 +193,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187193
Assert(false);
188194

189195
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
190-
//Assert(txn->xid < 1000 || MtmTransactionRecords != 1);
191-
// if (MtmIsFilteredTxn) {
192-
// Assert(MtmTransactionRecords == 0);
193-
// return;
194-
// }
196+
if (MtmIsFilteredTxn) {
197+
Assert(MtmTransactionRecords==0);
198+
return;
199+
}
195200
}else {
196201
csn_tcsn=MtmTransactionSnapshot(txn->xid);
197202
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
@@ -261,13 +266,13 @@ static void
261266
pglogical_write_insert(StringInfoout,PGLogicalOutputData*data,
262267
Relationrel,HeapTuplenewtuple)
263268
{
264-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
269+
if (MtmIsFilteredTxn){
265270
MTM_LOG2("%d: pglogical_write_insert filtered",MyProcPid);
266271
return;
267272
}
268273

269-
if (DDLInProress) {
270-
MTM_LOG2("%d: pglogical_write_insert filteredDDLInProress",MyProcPid);
274+
if (DDLInProgress) {
275+
MTM_LOG2("%d: pglogical_write_insert filteredDDLInProgress",MyProcPid);
271276
return;
272277
}
273278

@@ -284,13 +289,13 @@ static void
284289
pglogical_write_update(StringInfoout,PGLogicalOutputData*data,
285290
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
286291
{
287-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
292+
if (MtmIsFilteredTxn){
288293
MTM_LOG2("%d: pglogical_write_update filtered",MyProcPid);
289294
return;
290295
}
291296

292-
if (DDLInProress) {
293-
MTM_LOG2("%d: pglogical_write_update filteredDDLInProress",MyProcPid);
297+
if (DDLInProgress) {
298+
MTM_LOG2("%d: pglogical_write_update filteredDDLInProgress",MyProcPid);
294299
return;
295300
}
296301

@@ -317,13 +322,13 @@ static void
317322
pglogical_write_delete(StringInfoout,PGLogicalOutputData*data,
318323
Relationrel,HeapTupleoldtuple)
319324
{
320-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
325+
if (MtmIsFilteredTxn){
321326
MTM_LOG2("%d: pglogical_write_delete filtered",MyProcPid);
322327
return;
323328
}
324329

325-
if (DDLInProress) {
326-
MTM_LOG2("%d: pglogical_write_delete filteredDDLInProress",MyProcPid);
330+
if (DDLInProgress) {
331+
MTM_LOG2("%d: pglogical_write_delete filteredDDLInProgress",MyProcPid);
327332
return;
328333
}
329334

@@ -354,13 +359,13 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
354359
inti;
355360
uint16nliveatts=0;
356361

357-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
362+
if (MtmIsFilteredTxn){
358363
MTM_LOG2("%d: pglogical_write_tuple filtered",MyProcPid);
359364
return;
360365
}
361366

362-
if (DDLInProress) {
363-
MTM_LOG2("%d: pglogical_write_tuple filteredDDLInProress",MyProcPid);
367+
if (DDLInProgress) {
368+
MTM_LOG2("%d: pglogical_write_tuple filteredDDLInProgress",MyProcPid);
364369
return;
365370
}
366371

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp