Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd30ce26

Browse files
committed
Replace XactLastRecEnd with gxact->prepare_end_lsn
1 parentb4a5fe8 commitd30ce26

File tree

7 files changed

+256
-278
lines changed

7 files changed

+256
-278
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 81 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -91,21 +91,19 @@ static void MtmMonitor(Datum arg);
9191
staticvoidMtmSendHeartbeat(void);
9292
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
9393

94-
/*
95-
static char const* const messageText[] =
94+
staticcharconst*constmessageKindText[]=
9695
{
9796
"INVALID",
9897
"HANDSHAKE",
99-
"READY",
100-
"PREPARE",
10198
"PREPARED",
99+
"PRECOMMIT",
100+
"PRECOMMITTED",
102101
"ABORTED",
103102
"STATUS",
104103
"HEARTBEAT",
105104
"POLL_REQUEST",
106105
"POLL_STATUS"
107106
};
108-
*/
109107

110108
staticBackgroundWorkerMtmSenderWorker= {
111109
"mtm-sender",
@@ -364,7 +362,7 @@ static void MtmSendHeartbeat()
364362
MTM_LOG2("Send heartbeat to node %d with timestamp %ld",i+1,now);
365363
}
366364
}else {
367-
MTM_LOG1("Do not send heartbeat to node %d, busy mask %lld, status %d",i+1, (long long)busy_mask,Mtm->status);
365+
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %d",i+1, (long long)busy_mask,Mtm->status);
368366
}
369367
}
370368
}
@@ -902,9 +900,14 @@ static void MtmReceiver(Datum arg)
902900
msg->status=TRANSACTION_STATUS_ABORTED;
903901
}else {
904902
msg->status=tm->state->status;
903+
msg->csn=tm->state->csn;
905904
MTM_LOG1("Send response %d for transaction %s to node %d",msg->status,msg->gid,msg->node);
906905
}
907-
msg->code=MSG_POLL_STATUS;
906+
msg->disabledNodeMask=Mtm->disabledNodeMask;
907+
msg->connectivityMask=Mtm->connectivityMask;
908+
msg->oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
909+
msg->code=MSG_POLL_STATUS;
910+
msg->csn=ts->csn;
908911
MtmSendMessage(msg);
909912
continue;
910913
caseMSG_POLL_STATUS:
@@ -915,41 +918,34 @@ static void MtmReceiver(Datum arg)
915918
}else {
916919
ts=tm->state;
917920
BIT_SET(ts->votedMask,node-1);
918-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
919-
if (msg->status==TRANSACTION_STATUS_UNKNOWN||msg->status==TRANSACTION_STATUS_COMMITTED) {
920-
elog(LOG,"Commit transaction %s because it is in state %d at node %d",
921+
if (ts->status==TRANSACTION_STATUS_UNKNOWN) {
922+
if (msg->status==TRANSACTION_STATUS_IN_PROGRESS||msg->status==TRANSACTION_STATUS_ABORTED) {
923+
elog(LOG,"Abort transaction %s because it is in state %d at node %d",
921924
msg->gid,ts->status,node);
922-
Assert(!IsTransactionState());
923-
StartTransactionCommand();
924-
MtmSetCurrentTransactionGID(ts->gid);
925-
ts->status=TRANSACTION_STATUS_UNKNOWN;
926-
FinishPreparedTransaction(ts->gid, true);
927-
CommitTransactionCommand();
928-
Assert(ts->status==TRANSACTION_STATUS_COMMITTED);
929-
}elseif (msg->status==TRANSACTION_STATUS_ABORTED
930-
|| ((ts->participantsMask& ~Mtm->disabledNodeMask)& ~ts->votedMask)==0)
925+
MtmFinishPreparedTransaction(node,ts, false);
926+
}
927+
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
931928
{
932-
if (msg->status==TRANSACTION_STATUS_ABORTED) {
933-
elog(LOG,"Abort transaction %s because it is aborted at node %d",msg->gid,node);
934-
}else {
935-
elog(LOG,"Abort transaction %s because it is not prepared at any online node",msg->gid);
929+
if (msg->csn>ts->csn) {
930+
ts->csn=msg->csn;
931+
MtmSyncClock(ts->csn);
932+
}
933+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
934+
elog(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
935+
MtmFinishPreparedTransaction(node,ts, true);
936936
}
937-
Assert(!IsTransactionState());
938-
StartTransactionCommand();
939-
MtmSetCurrentTransactionGID(ts->gid);
940-
FinishPreparedTransaction(ts->gid, false);
941-
CommitTransactionCommand();
942-
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
943937
}else {
944938
elog(LOG,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
945-
msg->status,msg->gid,node, (long long)ts->votedMask,
946-
(long long) (ts->participantsMask& ~Mtm->disabledNodeMask) );
939+
msg->status,msg->gid,node, (long long)ts->votedMask, (long long) (ts->participantsMask& ~Mtm->disabledNodeMask));
947940
continue;
948941
}
949942
}elseif (ts->status==TRANSACTION_STATUS_ABORTED&&msg->status==TRANSACTION_STATUS_COMMITTED) {
950943
elog(WARNING,"Transaction %s is aborted at node %d but committed at node %d",msg->gid,MtmNodeId,node);
951944
}elseif (msg->status==TRANSACTION_STATUS_ABORTED&&ts->status==TRANSACTION_STATUS_COMMITTED) {
952945
elog(WARNING,"Transaction %s is committed at node %d but aborted at node %d",msg->gid,MtmNodeId,node);
946+
}else {
947+
elog(LOG,"Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx",
948+
msg->status,msg->gid,ts->status,node, (long long)ts->votedMask, (long long) (ts->participantsMask& ~Mtm->disabledNodeMask) );
953949
}
954950
}
955951
continue;
@@ -965,50 +961,49 @@ static void MtmReceiver(Datum arg)
965961
elog(WARNING,"Ignore response for unexisted transaction %d from node %d",msg->dxid,node);
966962
continue;
967963
}
964+
if (BIT_CHECK(ts->votedMask,node-1)) {
965+
elog(WARNING,"Receive deteriorated %s response for transaction %d (%s) from node %d",
966+
messageKindText[msg->code],ts->xid,ts->gid,node);
967+
continue;
968+
}
968969
MtmCheckResponse(msg);
969-
970+
BIT_SET(ts->votedMask,node-1);
971+
970972
if (MtmIsCoordinator(ts)) {
971973
switch (msg->code) {
972-
caseMSG_READY:
973-
MTM_TXTRACE(ts,"MtmTransReceiver gotMSG_READY");
974+
caseMSG_PREPARED:
975+
MTM_TXTRACE(ts,"MtmTransReceiver gotMSG_PREPARED");
974976
if (ts->status==TRANSACTION_STATUS_COMMITTED) {
975-
elog(WARNING,"ReceiveREADY response for already committed transaction %d from node %d",
977+
elog(WARNING,"ReceivePREPARED response for already committed transaction %d from node %d",
976978
ts->xid,node);
977979
continue;
978980
}
979-
if (ts->nVotes >=Mtm->nLiveNodes) {
980-
elog(WARNING,"Receive deteriorated READY response for transaction %d (%s) from node %d",
981-
ts->xid,ts->gid,node);
981+
Mtm->nodes[node-1].transDelay+=MtmGetCurrentTime()-ts->csn;
982+
ts->xids[node-1]=msg->sxid;
983+
984+
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
985+
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
986+
commit on smaller subset of nodes */
987+
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
988+
node, (long)Mtm->disabledNodeMask, (long)msg->disabledNodeMask);
982989
MtmAbortTransaction(ts);
983-
MtmWakeUpBackend(ts);
984-
}else {
985-
Mtm->nodes[node-1].transDelay+=MtmGetCurrentTime()-ts->csn;
986-
ts->xids[node-1]=msg->sxid;
987-
988-
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
989-
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
990-
commit on smaller subset of nodes */
991-
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
992-
node, (long)Mtm->disabledNodeMask, (long)msg->disabledNodeMask);
993-
MtmAbortTransaction(ts);
994-
}
995-
996-
if (++ts->nVotes==Mtm->nLiveNodes) {
997-
/* All nodes are finished their transactions */
998-
if (ts->status==TRANSACTION_STATUS_ABORTED) {
999-
MtmWakeUpBackend(ts);
990+
}
991+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
992+
/* All nodes are finished their transactions */
993+
if (ts->status==TRANSACTION_STATUS_ABORTED) {
994+
MtmWakeUpBackend(ts);
995+
}else {
996+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
997+
ts->isPrepared= true;
998+
if (ts->isTwoPhase) {
999+
MtmWakeUpBackend(ts);
1000+
}elseif (MtmUseDtm) {
1001+
ts->votedMask=0;
1002+
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PRECOMMIT");
1003+
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
10001004
}else {
1001-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1002-
if (ts->isTwoPhase) {
1003-
MtmWakeUpBackend(ts);
1004-
}elseif (MtmUseDtm) {
1005-
ts->nVotes=1;/* I voted myself */
1006-
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PREPARE");
1007-
MtmSend2PCMessage(ts,MSG_PREPARE);
1008-
}else {
1009-
ts->status=TRANSACTION_STATUS_UNKNOWN;
1010-
MtmWakeUpBackend(ts);
1011-
}
1005+
ts->status=TRANSACTION_STATUS_UNKNOWN;
1006+
MtmWakeUpBackend(ts);
10121007
}
10131008
}
10141009
}
@@ -1023,47 +1018,40 @@ static void MtmReceiver(Datum arg)
10231018
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
10241019
MtmAbortTransaction(ts);
10251020
}
1026-
if (++ts->nVotes >=Mtm->nLiveNodes) {
1021+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
10271022
MtmWakeUpBackend(ts);
10281023
}
10291024
break;
1030-
caseMSG_PREPARED:
1031-
MTM_TXTRACE(ts,"MtmTransReceiver got MSG_PREPARED");
1032-
if (ts->nVotes >=Mtm->nLiveNodes) {
1033-
elog(WARNING,"Receive deteriorated PREPARED response for transaction %d (%s) from node %d",
1034-
ts->xid,ts->gid,node);
1035-
MtmAbortTransaction(ts);
1036-
MtmWakeUpBackend(ts);
1025+
caseMSG_PRECOMMITTED:
1026+
MTM_TXTRACE(ts,"MtmTransReceiver got MSG_PRECOMMITTED");
1027+
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
1028+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1029+
if (msg->csn>ts->csn) {
1030+
ts->csn=msg->csn;
1031+
MtmSyncClock(ts->csn);
1032+
}
1033+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
1034+
ts->csn=MtmAssignCSN();
1035+
ts->status=TRANSACTION_STATUS_UNKNOWN;
1036+
MtmWakeUpBackend(ts);
1037+
}
10371038
}else {
1038-
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
1039-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1040-
if (msg->csn>ts->csn) {
1041-
ts->csn=msg->csn;
1042-
MtmSyncClock(ts->csn);
1043-
}
1044-
if (++ts->nVotes==Mtm->nLiveNodes) {
1045-
ts->csn=MtmAssignCSN();
1046-
ts->status=TRANSACTION_STATUS_UNKNOWN;
1047-
MtmWakeUpBackend(ts);
1048-
}
1049-
}else {
1050-
if (++ts->nVotes==Mtm->nLiveNodes) {
1051-
MtmWakeUpBackend(ts);
1052-
}
1053-
}
1054-
}
1039+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
1040+
MtmWakeUpBackend(ts);
1041+
}
1042+
}
10551043
break;
10561044
default:
10571045
Assert(false);
10581046
}
10591047
}else {
10601048
switch (msg->code) {
1061-
caseMSG_PREPARE:
1049+
caseMSG_PRECOMMIT:
10621050
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
10631051
ts->status=TRANSACTION_STATUS_UNKNOWN;
10641052
ts->csn=MtmAssignCSN();
10651053
MtmAdjustSubtransactions(ts);
1066-
MtmSend2PCMessage(ts,MSG_PREPARED);
1054+
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
10671055
}else {
10681056
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
10691057
MtmSend2PCMessage(ts,MSG_ABORTED);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp