Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit076234f

Browse files
committed
3pc support fixes
1 parent4dda4ec commit076234f

File tree

11 files changed

+279
-182
lines changed

11 files changed

+279
-182
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ static void MtmMonitor(Datum arg);
9191
staticvoidMtmSendHeartbeat(void);
9292
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
9393

94-
staticcharconst*constmessageKindText[]=
94+
charconst*constMtmMessageKindMnem[]=
9595
{
9696
"INVALID",
9797
"HANDSHAKE",
@@ -318,7 +318,7 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
318318
&&Mtm->status!=MTM_RECOVERY
319319
&&Mtm->nodes[MtmNodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)<MtmGetSystemTime())
320320
{
321-
elog(WARNING,"Node %d thinks that I am dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],messageKindText[resp->code]);
321+
elog(WARNING,"Node %d thinks that I am dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],MtmMessageKindMnem[resp->code]);
322322
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
323323
MtmSwitchClusterMode(MTM_RECOVERY);
324324
}elseif (BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)&&sockets[resp->node-1]<0) {
@@ -372,7 +372,7 @@ static void MtmSendHeartbeat()
372372
MTM_LOG4("Send heartbeat to node %d with timestamp %ld",i+1,now);
373373
}
374374
}else {
375-
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %d",i+1, (long long)busy_mask,Mtm->status);
375+
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %s",i+1, (long long)busy_mask,MtmNodeStatusMnem[Mtm->status]);
376376
}
377377
}
378378
}
@@ -882,6 +882,7 @@ static void MtmReceiver(Datum arg)
882882
rxBuffer[i].used+=rc;
883883
nResponses=rxBuffer[i].used/sizeof(MtmArbiterMessage);
884884

885+
885886
MtmLock(LW_EXCLUSIVE);
886887

887888
for (j=0;j<nResponses;j++) {
@@ -897,6 +898,7 @@ static void MtmReceiver(Datum arg)
897898
Mtm->nodes[node-1].lastHeartbeat=MtmGetSystemTime();
898899

899900
MtmCheckResponse(msg);
901+
MTM_LOG2("Receive response %s for transaction %s from node %d",MtmMessageKindMnem[msg->code],msg->gid,msg->node);
900902

901903
switch (msg->code) {
902904
caseMSG_HEARTBEAT:
@@ -912,7 +914,7 @@ static void MtmReceiver(Datum arg)
912914
}else {
913915
msg->status=tm->state->status;
914916
msg->csn=tm->state->csn;
915-
MTM_LOG1("Send response %d for transaction %s to node %d",msg->status,msg->gid,msg->node);
917+
MTM_LOG1("Send response %s for transaction %s to node %d",MtmTxnStatusMnem[msg->status],msg->gid,msg->node);
916918
}
917919
msg->disabledNodeMask=Mtm->disabledNodeMask;
918920
msg->connectivityMask=Mtm->connectivityMask;
@@ -931,7 +933,7 @@ static void MtmReceiver(Datum arg)
931933
if (ts->status==TRANSACTION_STATUS_UNKNOWN) {
932934
if (msg->status==TRANSACTION_STATUS_IN_PROGRESS||msg->status==TRANSACTION_STATUS_ABORTED) {
933935
elog(LOG,"Abort prepared transaction %s because it is in state %s at node %d",
934-
msg->gid,MtmNodeStatusMnem[msg->status],node);
936+
msg->gid,MtmTxnStatusMnem[msg->status],node);
935937
MtmFinishPreparedTransaction(ts, false);
936938
}
937939
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
@@ -944,11 +946,12 @@ static void MtmReceiver(Datum arg)
944946
elog(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
945947
MtmFinishPreparedTransaction(ts, true);
946948
}else {
947-
MTM_LOG1("Receive response for transaction %s -> %d, participants=%llx, voted=%llx",msg->gid,msg->status, (long long)ts->participantsMask, (long long)ts->votedMask);
949+
MTM_LOG1("Receive response for transaction %s -> %s, participants=%llx, voted=%llx",
950+
msg->gid,MtmTxnStatusMnem[msg->status], (long long)ts->participantsMask, (long long)ts->votedMask);
948951
}
949952
}else {
950953
elog(LOG,"Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
951-
MtmNodeStatusMnem[msg->status],msg->gid,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
954+
MtmTxnStatusMnem[msg->status],msg->gid,node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
952955
continue;
953956
}
954957
}elseif (ts->status==TRANSACTION_STATUS_ABORTED&&msg->status==TRANSACTION_STATUS_COMMITTED) {
@@ -957,7 +960,7 @@ static void MtmReceiver(Datum arg)
957960
elog(WARNING,"Transaction %s is committed at node %d but aborted at node %d",msg->gid,MtmNodeId,node);
958961
}else {
959962
elog(LOG,"Receive response %s for transaction %s status %s for node %d, votedMask %llx, participantsMask %llx",
960-
MtmNodeStatusMnem[msg->status],msg->gid,MtmNodeStatusMnem[ts->status],node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask) );
963+
MtmTxnStatusMnem[msg->status],msg->gid,MtmTxnStatusMnem[ts->status],node, (long long)ts->votedMask, (long long)(ts->participantsMask& ~Mtm->disabledNodeMask) );
961964
}
962965
}
963966
continue;
@@ -976,7 +979,7 @@ static void MtmReceiver(Datum arg)
976979
Assert(msg->code==MSG_ABORTED||strcmp(msg->gid,ts->gid)==0);
977980
if (BIT_CHECK(ts->votedMask,node-1)) {
978981
elog(WARNING,"Receive deteriorated %s response for transaction %d (%s) from node %d",
979-
messageKindText[msg->code],ts->xid,ts->gid,node);
982+
MtmMessageKindMnem[msg->code],ts->xid,ts->gid,node);
980983
continue;
981984
}
982985
BIT_SET(ts->votedMask,node-1);
@@ -1006,16 +1009,20 @@ static void MtmReceiver(Datum arg)
10061009
MtmWakeUpBackend(ts);
10071010
}else {
10081011
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1009-
MTM_LOG2("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
1010-
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
1012+
MTM_LOG2("Transaction %s is prepared (status=%s participants=%lx disabled=%lx, voted=%lx)",
1013+
ts->gid,MtmTxnStatusMnem[ts->status],ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
10111014
ts->isPrepared= true;
10121015
if (ts->isTwoPhase) {
10131016
MtmWakeUpBackend(ts);
10141017
}elseif (MtmUseDtm) {
10151018
ts->votedMask=0;
10161019
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PRECOMMIT");
10171020
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1018-
SetPrepareTransactionState(ts->gid,"precommitted");
1021+
Assert(replorigin_session_origin==InvalidRepOriginId);
1022+
MTM_LOG2("SetPreparedTransactionState for %s",ts->gid);
1023+
MtmUnlock();
1024+
SetPreparedTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
1025+
MtmLock(LW_EXCLUSIVE);
10191026
}else {
10201027
ts->status=TRANSACTION_STATUS_UNKNOWN;
10211028
MtmWakeUpBackend(ts);
@@ -1072,8 +1079,7 @@ static void MtmReceiver(Datum arg)
10721079
}elseif (ts->status==TRANSACTION_STATUS_ABORTED) {
10731080
MtmSend2PCMessage(ts,MSG_ABORTED);
10741081
}else {
1075-
elog(WARNING,"Transaction %s is already %s",
1076-
ts->gid,ts->status==TRANSACTION_STATUS_COMMITTED ?"committed" :"prepared");
1082+
elog(WARNING,"Transaction %s is already %s",ts->gid,MtmTxnStatusMnem[ts->status]);
10771083
}
10781084
break;
10791085
default:

‎contrib/mmts/multimaster.c

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,14 @@ char const* const MtmNodeStatusMnem[] =
204204
"OutOfService"
205205
};
206206

207+
charconst*constMtmTxnStatusMnem[]=
208+
{
209+
"InProgress",
210+
"Committed",
211+
"Aborted",
212+
"Unknown"
213+
};
214+
207215
boolMtmDoReplication;
208216
char*MtmDatabaseName;
209217
char*MtmDatabaseUser;
@@ -595,17 +603,17 @@ MtmAdjustOldestXid(TransactionId xid)
595603
if (MtmUseDtm&& !MtmVolksWagenMode)
596604
{
597605
if (prev!=NULL) {
598-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%d, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
599-
MyProcPid,xid,prev->xid,prev->status,prev->snapshot, (ts ?ts->xid :0), (ts ?ts->status :-1), (ts ?ts->snapshot :-1),oldestSnapshot);
606+
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%s, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
607+
MyProcPid,xid,prev->xid,MtmTxnStatusMnem[prev->status],prev->snapshot, (ts ?ts->xid :0), (ts ?ts->status :-1), (ts ?ts->snapshot :-1),oldestSnapshot);
600608
Mtm->transListHead=prev;
601609
Mtm->oldestXid=xid=prev->xid;
602610
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
603611
xid=Mtm->oldestXid;
604612
}
605613
}else {
606614
if (prev!=NULL) {
607-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%d, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
608-
MyProcPid,xid,prev->xid,prev->status,prev->snapshot, (ts ?ts->xid :0), (ts ?ts->status :-1), (ts ?ts->snapshot :-1),oldestSnapshot);
615+
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%s, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
616+
MyProcPid,xid,prev->xid,MtmTxnStatusMnem[prev->status],prev->snapshot, (ts ?ts->xid :0), (ts ?ts->status :-1), (ts ?ts->snapshot :-1),oldestSnapshot);
609617
Mtm->transListHead=prev;
610618
}
611619
}
@@ -933,7 +941,8 @@ void MtmPrecommitTransaction(char const* gid)
933941
{
934942
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_FIND,NULL);
935943
if (tm==NULL) {
936-
elog(WARNING,"MtmPrecommitTransaction: transaciton '%s' is not found",gid);
944+
MtmUnlock();
945+
elog(WARNING,"MtmPrecommitTransaction: transaction '%s' is not found",gid);
937946
}else {
938947
MtmTransState*ts=tm->state;
939948
Assert(ts!=NULL);
@@ -942,10 +951,11 @@ void MtmPrecommitTransaction(char const* gid)
942951
ts->csn=MtmAssignCSN();
943952
MtmAdjustSubtransactions(ts);
944953
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
945-
SetPrepareTransactionState(ts->gid,"precommitted");
954+
Assert(replorigin_session_origin!=InvalidRepOriginId);
955+
MtmUnlock();
956+
SetPreparedTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
946957
}
947958
}
948-
MtmUnlock();
949959
}
950960

951961

@@ -967,15 +977,18 @@ MtmVotingCompleted(MtmTransState* ts)
967977
ts->status=TRANSACTION_STATUS_UNKNOWN;
968978
return true;
969979
}else {
970-
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
971-
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
980+
MTM_LOG1("Transaction %s is considered as prepared (status=%s participants=%lx disabled=%lx, voted=%lx)",
981+
ts->gid,MtmTxnStatusMnem[ts->status],ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
972982
ts->isPrepared= true;
973983
if (ts->isTwoPhase) {
974984
ts->votingCompleted= true;
975985
return true;
976986
}elseif (MtmUseDtm) {
977987
ts->votedMask=0;
978-
SetPrepareTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
988+
Assert(replorigin_session_origin==InvalidRepOriginId);
989+
MtmUnlock();
990+
SetPreparedTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
991+
MtmLock(LW_EXCLUSIVE);
979992
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
980993
return false;
981994
}else {
@@ -1023,7 +1036,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10231036
if (ts->isPrepared) {
10241037
/* resend precommit message */
10251038
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1026-
elog(LOG,"Distributes transaction is not committed in %ld msec",USEC_TO_MSEC(now-start));
1039+
elog(LOG,"Distributed transaction is not committed in %ld msec",USEC_TO_MSEC(now-start));
10271040
}else {
10281041
elog(WARNING,"Commit of distributed transaction is canceled because of %ld msec timeout expiration",USEC_TO_MSEC(timeout));
10291042
MtmAbortTransaction(ts);
@@ -1047,7 +1060,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10471060
}
10481061
}
10491062
x->status=ts->status;
1050-
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
1063+
MTM_LOG3("%d: Result of vote: %d",MyProcPid,MtmTxnStatusMnem[ts->status]);
10511064
}
10521065

10531066
staticvoid
@@ -1071,6 +1084,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10711084
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10721085
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
10731086
MTM_TXTRACE(x,"recovery?");
1087+
MTM_LOG3("Preparing transaction %d (%s) at %ld",x->xid,x->gid,MtmGetCurrentTime());
10741088
Assert(x->gid[0]);
10751089
ts->votingCompleted= true;
10761090
MTM_TXTRACE(x,"recovery? 1");
@@ -1130,9 +1144,12 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
11301144
ts->votingCompleted= false;
11311145
ts->votedMask=0;
11321146
ts->procno=MyProc->pgprocno;
1133-
MTM_TXTRACE(ts,"Coordinator sends MSG_PRECOMMIT");
1134-
SetPrepareTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
1147+
MTM_LOG2("Coordinator of transaction %s sends MSG_PRECOMMIT",ts->gid);
1148+
Assert(replorigin_session_origin==InvalidRepOriginId);
1149+
MtmUnlock();
1150+
SetPreparedTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
11351151
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1152+
MtmLock(LW_EXCLUSIVE);
11361153

11371154
Mtm2PCVoting(x,ts);
11381155

@@ -1202,17 +1219,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021219
}
12031220
if (ts!=NULL) {
12041221
if (*ts->gid)
1205-
MTM_LOG2("TRANSLOG: %s transaction gid=%s xid=%d node=%d dxid=%d status %d",
1206-
(commit ?"commit" :"rollback"),ts->gid,ts->xid,ts->gtid.node,ts->gtid.xid,ts->status);
1222+
MTM_LOG2("TRANSLOG: %s transaction gid=%s xid=%d node=%d dxid=%d status %s",
1223+
(commit ?"commit" :"rollback"),ts->gid,ts->xid,ts->gtid.node,ts->gtid.xid,MtmTxnStatusMnem[ts->status]);
12071224
if (commit) {
12081225
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
12091226
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY)))
1210-
{
1211-
Assert(false);
1227+
{Assert(false);
12121228
elog(ERROR,"Attempt to commit %s transaction %d (%s)",
1213-
ts->status==TRANSACTION_STATUS_ABORTED ?"aborted"
1214-
:ts->status==TRANSACTION_STATUS_COMMITTED ?"committed" :"not prepared",
1215-
ts->xid,ts->gid);
1229+
MtmTxnStatusMnem[ts->status],ts->xid,ts->gid);
12161230
}
12171231
if (x->csn>ts->csn||Mtm->status==MTM_RECOVERY) {
12181232
Assert(x->csn!=INVALID_CSN);
@@ -1324,6 +1338,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
13241338
}
13251339
}
13261340
}elseif (!BIT_CHECK(Mtm->disabledNodeMask,ts->gtid.node-1)) {
1341+
MTM_LOG2("Send %s message to node %d xid=%d gid=%s",MtmMessageKindMnem[cmd],ts->gtid.node,ts->gtid.xid,ts->gid);
13271342
msg.node=ts->gtid.node;
13281343
msg.dxid=ts->gtid.xid;
13291344
MtmSendMessage(&msg);
@@ -1522,7 +1537,7 @@ void MtmAbortTransaction(MtmTransState* ts)
15221537
if (ts->status==TRANSACTION_STATUS_COMMITTED) {
15231538
elog(LOG,"Attempt to rollback already committed transaction %d (%s)",ts->xid,ts->gid);
15241539
}else {
1525-
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %d",ts->gtid.node,ts->gtid.xid,ts->xid,ts->status);
1540+
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %s",ts->gtid.node,ts->gtid.xid,ts->xid,MtmTxnStatusMnem[ts->status]);
15261541
ts->status=TRANSACTION_STATUS_ABORTED;
15271542
MtmAdjustSubtransactions(ts);
15281543
if (ts->isActive) {
@@ -1590,8 +1605,8 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
15901605
MtmBroadcastPollMessage(ts);
15911606
}
15921607
}else {
1593-
MTM_LOG1("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
1594-
ts->xid,ts->gid,ts->status,ts->gtid.node,ts->gtid.xid,ts->votedMask);
1608+
MTM_LOG1("Skip transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx",
1609+
ts->xid,ts->gid,MtmTxnStatusMnem[ts->status],ts->gtid.node,ts->gtid.xid,ts->votedMask);
15951610
}
15961611
}
15971612
}
@@ -1926,8 +1941,8 @@ bool MtmRefreshClusterStatus(bool nowait)
19261941
MtmTransState*ts;
19271942
/* Interrupt voting for active transaction and abort them */
19281943
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
1929-
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1930-
ts->gid,ts->gtid.node,ts->xid,ts->status,ts->gtid.xid);
1944+
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%s, gtid.xid=%d",
1945+
ts->gid,ts->gtid.node,ts->xid,MtmTxnStatusMnen[ts->status],ts->gtid.xid);
19311946
if (MtmIsCoordinator(ts)&& !ts->votingCompleted&&ts->status!=TRANSACTION_STATUS_ABORTED) {
19321947
MtmAbortTransaction(ts);
19331948
MtmWakeUpBackend(ts);
@@ -2898,7 +2913,7 @@ void MtmReleaseRecoverySlot(int nodeId)
28982913
voidMtmRollbackPreparedTransaction(intnodeId,charconst*gid)
28992914
{
29002915
XidStatusstatus=MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED);
2901-
MTM_LOG1("Abort prepared transaction %s status %d",gid,status);
2916+
MTM_LOG1("Abort prepared transaction %s status %s",gid,MtmTxnStatusMnem[status]);
29022917
if (status==TRANSACTION_STATUS_UNKNOWN) {
29032918
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
29042919
MtmResetTransaction();
@@ -3176,6 +3191,9 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
31763191
boolres=Mtm->status!=MTM_RECOVERY
31773192
&& (args->origin_id==InvalidRepOriginId
31783193
||MtmIsRecoveredNode(MtmReplicationNodeId));
3194+
if (!res) {
3195+
MTM_LOG2("Filter transaction with origin_id=%d",args->origin_id);
3196+
}
31793197
returnres;
31803198
}
31813199

@@ -3211,7 +3229,7 @@ bool MtmFilterTransaction(char* record, int size)
32113229
intreplication_node;
32123230
intorigin_node;
32133231
charconst*gid="";
3214-
charmsgtype;
3232+
charmsgtypePG_USED_FOR_ASSERTS_ONLY;
32153233
boolduplicate= false;
32163234

32173235
s.data=record;

‎contrib/mmts/multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4141
#endif
4242

43-
#ifMTM_TRACE
43+
#ifMTM_TRACE==0
4444
#defineMTM_TXTRACE(tx,event)
4545
#else
4646
#defineMTM_TXTRACE(tx,event) \
@@ -309,6 +309,8 @@ typedef struct MtmFlushPosition
309309
#defineMtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
310310

311311
externcharconst*constMtmNodeStatusMnem[];
312+
externcharconst*constMtmTxnStatusMnem[];
313+
externcharconst*constMtmMessageKindMnem[];
312314

313315
externMtmState*Mtm;
314316

‎contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ void MtmExecutor(void* work, size_t size)
10281028
{
10291029
while (true) {
10301030
charaction=pq_getmsgbyte(&s);
1031-
MTM_LOG3("%d: REMOTE process action %c",MyProcPid,action);
1031+
MTM_LOG2("%d: REMOTE process action %c",MyProcPid,action);
10321032
#if0
10331033
if (Mtm->status==MTM_RECOVERY) {
10341034
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);

‎contrib/mmts/pglogical_proto.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
184184
if (txn->xact_action==XLOG_XACT_COMMIT)
185185
flags=PGLOGICAL_COMMIT;
186186
elseif (txn->xact_action==XLOG_XACT_PREPARE)
187-
flags=strcmp(txn->state_3pc,"precommitted")==0 ?PGLOGICAL_PRECOMMIT_PREPARED :PGLOGICAL_PREPARE;
187+
flags=*txn->state_3pc ?PGLOGICAL_PRECOMMIT_PREPARED :PGLOGICAL_PREPARE;
188188
elseif (txn->xact_action==XLOG_XACT_COMMIT_PREPARED)
189189
flags=PGLOGICAL_COMMIT_PREPARED;
190190
elseif (txn->xact_action==XLOG_XACT_ABORT_PREPARED)
@@ -214,6 +214,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
214214
MTM_LOG1("Send ABORT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
215215
txn->xid,txn->gid,txn->end_lsn,MtmReplicationNodeId,isRecovery,txn->origin_id,csn);
216216
}
217+
if (flags==PGLOGICAL_PRECOMMIT_PREPARED) {
218+
MTM_LOG2("Send PGLOGICAL_PRECOMMIT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
219+
txn->xid,txn->gid,txn->end_lsn,MtmReplicationNodeId,isRecovery,txn->origin_id,csn);
220+
}
217221
if (MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn)) {
218222
MTM_LOG1("wal-sender complete recovery of node %d at LSN(commit %lx, end %lx, log %lx) in transaction %s event %d",MtmReplicationNodeId,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr(),txn->gid,flags);
219223
flags |=PGLOGICAL_CAUGHT_UP;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp