Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0eaaa1c

Browse files
knizhnikkelvich
authored andcommitted
Replace XactLastRecEnd with gxact->prepare_end_lsn
1 parent3db5770 commit0eaaa1c

File tree

6 files changed

+254
-275
lines changed

6 files changed

+254
-275
lines changed

‎arbiter.c

Lines changed: 81 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -91,21 +91,19 @@ static void MtmMonitor(Datum arg);
9191
staticvoidMtmSendHeartbeat(void);
9292
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
9393

94-
/*
95-
static char const* const messageText[] =
94+
staticcharconst*constmessageKindText[]=
9695
{
9796
"INVALID",
9897
"HANDSHAKE",
99-
"READY",
100-
"PREPARE",
10198
"PREPARED",
99+
"PRECOMMIT",
100+
"PRECOMMITTED",
102101
"ABORTED",
103102
"STATUS",
104103
"HEARTBEAT",
105104
"POLL_REQUEST",
106105
"POLL_STATUS"
107106
};
108-
*/
109107

110108
staticBackgroundWorkerMtmSenderWorker= {
111109
"mtm-sender",
@@ -364,7 +362,7 @@ static void MtmSendHeartbeat()
364362
MTM_LOG2("Send heartbeat to node %d with timestamp %ld",i+1,now);
365363
}
366364
}else {
367-
MTM_LOG1("Do not send heartbeat to node %d, busy mask %lld, status %d",i+1, (long long)busy_mask,Mtm->status);
365+
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %d",i+1, (long long)busy_mask,Mtm->status);
368366
}
369367
}
370368
}
@@ -898,9 +896,14 @@ static void MtmReceiver(Datum arg)
898896
msg->status=TRANSACTION_STATUS_ABORTED;
899897
}else {
900898
msg->status=tm->state->status;
899+
msg->csn=tm->state->csn;
901900
MTM_LOG1("Send response %d for transaction %s to node %d",msg->status,msg->gid,msg->node);
902901
}
903-
msg->code=MSG_POLL_STATUS;
902+
msg->disabledNodeMask=Mtm->disabledNodeMask;
903+
msg->connectivityMask=Mtm->connectivityMask;
904+
msg->oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
905+
msg->code=MSG_POLL_STATUS;
906+
msg->csn=ts->csn;
904907
MtmSendMessage(msg);
905908
continue;
906909
caseMSG_POLL_STATUS:
@@ -911,41 +914,34 @@ static void MtmReceiver(Datum arg)
911914
}else {
912915
ts=tm->state;
913916
BIT_SET(ts->votedMask,node-1);
914-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
915-
if (msg->status==TRANSACTION_STATUS_UNKNOWN||msg->status==TRANSACTION_STATUS_COMMITTED) {
916-
elog(LOG,"Commit transaction %s because it is in state %d at node %d",
917+
if (ts->status==TRANSACTION_STATUS_UNKNOWN) {
918+
if (msg->status==TRANSACTION_STATUS_IN_PROGRESS||msg->status==TRANSACTION_STATUS_ABORTED) {
919+
elog(LOG,"Abort transaction %s because it is in state %d at node %d",
917920
msg->gid,ts->status,node);
918-
Assert(!IsTransactionState());
919-
StartTransactionCommand();
920-
MtmSetCurrentTransactionGID(ts->gid);
921-
ts->status=TRANSACTION_STATUS_UNKNOWN;
922-
FinishPreparedTransaction(ts->gid, true);
923-
CommitTransactionCommand();
924-
Assert(ts->status==TRANSACTION_STATUS_COMMITTED);
925-
}elseif (msg->status==TRANSACTION_STATUS_ABORTED
926-
|| ((ts->participantsMask& ~Mtm->disabledNodeMask)& ~ts->votedMask)==0)
921+
MtmFinishPreparedTransaction(node,ts, false);
922+
}
923+
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
927924
{
928-
if (msg->status==TRANSACTION_STATUS_ABORTED) {
929-
elog(LOG,"Abort transaction %s because it is aborted at node %d",msg->gid,node);
930-
}else {
931-
elog(LOG,"Abort transaction %s because it is not prepared at any online node",msg->gid);
925+
if (msg->csn>ts->csn) {
926+
ts->csn=msg->csn;
927+
MtmSyncClock(ts->csn);
928+
}
929+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
930+
elog(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
931+
MtmFinishPreparedTransaction(node,ts, true);
932932
}
933-
Assert(!IsTransactionState());
934-
StartTransactionCommand();
935-
MtmSetCurrentTransactionGID(ts->gid);
936-
FinishPreparedTransaction(ts->gid, false);
937-
CommitTransactionCommand();
938-
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
939933
}else {
940934
elog(LOG,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
941-
msg->status,msg->gid,node, (long long)ts->votedMask,
942-
(long long) (ts->participantsMask& ~Mtm->disabledNodeMask) );
935+
msg->status,msg->gid,node, (long long)ts->votedMask, (long long) (ts->participantsMask& ~Mtm->disabledNodeMask));
943936
continue;
944937
}
945938
}elseif (ts->status==TRANSACTION_STATUS_ABORTED&&msg->status==TRANSACTION_STATUS_COMMITTED) {
946939
elog(WARNING,"Transaction %s is aborted at node %d but committed at node %d",msg->gid,MtmNodeId,node);
947940
}elseif (msg->status==TRANSACTION_STATUS_ABORTED&&ts->status==TRANSACTION_STATUS_COMMITTED) {
948941
elog(WARNING,"Transaction %s is committed at node %d but aborted at node %d",msg->gid,MtmNodeId,node);
942+
}else {
943+
elog(LOG,"Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx",
944+
msg->status,msg->gid,ts->status,node, (long long)ts->votedMask, (long long) (ts->participantsMask& ~Mtm->disabledNodeMask) );
949945
}
950946
}
951947
continue;
@@ -961,50 +957,49 @@ static void MtmReceiver(Datum arg)
961957
elog(WARNING,"Ignore response for unexisted transaction %d from node %d",msg->dxid,node);
962958
continue;
963959
}
960+
if (BIT_CHECK(ts->votedMask,node-1)) {
961+
elog(WARNING,"Receive deteriorated %s response for transaction %d (%s) from node %d",
962+
messageKindText[msg->code],ts->xid,ts->gid,node);
963+
continue;
964+
}
964965
MtmCheckResponse(msg);
965-
966+
BIT_SET(ts->votedMask,node-1);
967+
966968
if (MtmIsCoordinator(ts)) {
967969
switch (msg->code) {
968-
caseMSG_READY:
969-
MTM_TXTRACE(ts,"MtmTransReceiver gotMSG_READY");
970+
caseMSG_PREPARED:
971+
MTM_TXTRACE(ts,"MtmTransReceiver gotMSG_PREPARED");
970972
if (ts->status==TRANSACTION_STATUS_COMMITTED) {
971-
elog(WARNING,"ReceiveREADY response for already committed transaction %d from node %d",
973+
elog(WARNING,"ReceivePREPARED response for already committed transaction %d from node %d",
972974
ts->xid,node);
973975
continue;
974976
}
975-
if (ts->nVotes >=Mtm->nLiveNodes) {
976-
elog(WARNING,"Receive deteriorated READY response for transaction %d (%s) from node %d",
977-
ts->xid,ts->gid,node);
977+
Mtm->nodes[node-1].transDelay+=MtmGetCurrentTime()-ts->csn;
978+
ts->xids[node-1]=msg->sxid;
979+
980+
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
981+
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
982+
commit on smaller subset of nodes */
983+
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
984+
node, (long)Mtm->disabledNodeMask, (long)msg->disabledNodeMask);
978985
MtmAbortTransaction(ts);
979-
MtmWakeUpBackend(ts);
980-
}else {
981-
Mtm->nodes[node-1].transDelay+=MtmGetCurrentTime()-ts->csn;
982-
ts->xids[node-1]=msg->sxid;
983-
984-
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
985-
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
986-
commit on smaller subset of nodes */
987-
elog(WARNING,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
988-
node, (long)Mtm->disabledNodeMask, (long)msg->disabledNodeMask);
989-
MtmAbortTransaction(ts);
990-
}
991-
992-
if (++ts->nVotes==Mtm->nLiveNodes) {
993-
/* All nodes are finished their transactions */
994-
if (ts->status==TRANSACTION_STATUS_ABORTED) {
995-
MtmWakeUpBackend(ts);
986+
}
987+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
988+
/* All nodes are finished their transactions */
989+
if (ts->status==TRANSACTION_STATUS_ABORTED) {
990+
MtmWakeUpBackend(ts);
991+
}else {
992+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
993+
ts->isPrepared= true;
994+
if (ts->isTwoPhase) {
995+
MtmWakeUpBackend(ts);
996+
}elseif (MtmUseDtm) {
997+
ts->votedMask=0;
998+
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PRECOMMIT");
999+
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
9961000
}else {
997-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
998-
if (ts->isTwoPhase) {
999-
MtmWakeUpBackend(ts);
1000-
}elseif (MtmUseDtm) {
1001-
ts->nVotes=1;/* I voted myself */
1002-
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PREPARE");
1003-
MtmSend2PCMessage(ts,MSG_PREPARE);
1004-
}else {
1005-
ts->status=TRANSACTION_STATUS_UNKNOWN;
1006-
MtmWakeUpBackend(ts);
1007-
}
1001+
ts->status=TRANSACTION_STATUS_UNKNOWN;
1002+
MtmWakeUpBackend(ts);
10081003
}
10091004
}
10101005
}
@@ -1019,47 +1014,40 @@ static void MtmReceiver(Datum arg)
10191014
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
10201015
MtmAbortTransaction(ts);
10211016
}
1022-
if (++ts->nVotes >=Mtm->nLiveNodes) {
1017+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
10231018
MtmWakeUpBackend(ts);
10241019
}
10251020
break;
1026-
caseMSG_PREPARED:
1027-
MTM_TXTRACE(ts,"MtmTransReceiver got MSG_PREPARED");
1028-
if (ts->nVotes >=Mtm->nLiveNodes) {
1029-
elog(WARNING,"Receive deteriorated PREPARED response for transaction %d (%s) from node %d",
1030-
ts->xid,ts->gid,node);
1031-
MtmAbortTransaction(ts);
1032-
MtmWakeUpBackend(ts);
1021+
caseMSG_PRECOMMITTED:
1022+
MTM_TXTRACE(ts,"MtmTransReceiver got MSG_PRECOMMITTED");
1023+
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
1024+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1025+
if (msg->csn>ts->csn) {
1026+
ts->csn=msg->csn;
1027+
MtmSyncClock(ts->csn);
1028+
}
1029+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
1030+
ts->csn=MtmAssignCSN();
1031+
ts->status=TRANSACTION_STATUS_UNKNOWN;
1032+
MtmWakeUpBackend(ts);
1033+
}
10331034
}else {
1034-
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
1035-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1036-
if (msg->csn>ts->csn) {
1037-
ts->csn=msg->csn;
1038-
MtmSyncClock(ts->csn);
1039-
}
1040-
if (++ts->nVotes==Mtm->nLiveNodes) {
1041-
ts->csn=MtmAssignCSN();
1042-
ts->status=TRANSACTION_STATUS_UNKNOWN;
1043-
MtmWakeUpBackend(ts);
1044-
}
1045-
}else {
1046-
if (++ts->nVotes==Mtm->nLiveNodes) {
1047-
MtmWakeUpBackend(ts);
1048-
}
1049-
}
1050-
}
1035+
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
1036+
MtmWakeUpBackend(ts);
1037+
}
1038+
}
10511039
break;
10521040
default:
10531041
Assert(false);
10541042
}
10551043
}else {
10561044
switch (msg->code) {
1057-
caseMSG_PREPARE:
1045+
caseMSG_PRECOMMIT:
10581046
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
10591047
ts->status=TRANSACTION_STATUS_UNKNOWN;
10601048
ts->csn=MtmAssignCSN();
10611049
MtmAdjustSubtransactions(ts);
1062-
MtmSend2PCMessage(ts,MSG_PREPARED);
1050+
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
10631051
}else {
10641052
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
10651053
MtmSend2PCMessage(ts,MSG_ABORTED);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp