@@ -91,21 +91,19 @@ static void MtmMonitor(Datum arg);
9191static void MtmSendHeartbeat (void );
9292static bool MtmSendToNode (int node ,void const * buf ,int size );
9393
94- /*
95- static char const* const messageText[] =
94+ static char const * const messageKindText []=
9695{
9796"INVALID" ,
9897"HANDSHAKE" ,
99- "READY",
100- "PREPARE",
10198"PREPARED" ,
99+ "PRECOMMIT" ,
100+ "PRECOMMITTED" ,
102101"ABORTED" ,
103102"STATUS" ,
104103"HEARTBEAT" ,
105104"POLL_REQUEST" ,
106105"POLL_STATUS"
107106};
108- */
109107
110108static BackgroundWorker MtmSenderWorker = {
111109"mtm-sender" ,
@@ -364,7 +362,7 @@ static void MtmSendHeartbeat()
364362MTM_LOG2 ("Send heartbeat to node %d with timestamp %ld" ,i + 1 ,now );
365363}
366364}else {
367- MTM_LOG1 ("Do not send heartbeat to node %d, busy mask %lld, status %d" ,i + 1 , (long long )busy_mask ,Mtm -> status );
365+ MTM_LOG2 ("Do not send heartbeat to node %d, busy mask %lld, status %d" ,i + 1 , (long long )busy_mask ,Mtm -> status );
368366}
369367}
370368}
@@ -898,9 +896,14 @@ static void MtmReceiver(Datum arg)
898896msg -> status = TRANSACTION_STATUS_ABORTED ;
899897}else {
900898msg -> status = tm -> state -> status ;
899+ msg -> csn = tm -> state -> csn ;
901900MTM_LOG1 ("Send response %d for transaction %s to node %d" ,msg -> status ,msg -> gid ,msg -> node );
902901}
903- msg -> code = MSG_POLL_STATUS ;
902+ msg -> disabledNodeMask = Mtm -> disabledNodeMask ;
903+ msg -> connectivityMask = Mtm -> connectivityMask ;
904+ msg -> oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
905+ msg -> code = MSG_POLL_STATUS ;
906+ msg -> csn = ts -> csn ;
904907MtmSendMessage (msg );
905908continue ;
906909case MSG_POLL_STATUS :
@@ -911,41 +914,34 @@ static void MtmReceiver(Datum arg)
911914}else {
912915ts = tm -> state ;
913916BIT_SET (ts -> votedMask ,node - 1 );
914- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
915- if (msg -> status == TRANSACTION_STATUS_UNKNOWN || msg -> status == TRANSACTION_STATUS_COMMITTED ) {
916- elog (LOG ,"Commit transaction %s because it is in state %d at node %d" ,
917+ if (ts -> status == TRANSACTION_STATUS_UNKNOWN ) {
918+ if (msg -> status == TRANSACTION_STATUS_IN_PROGRESS || msg -> status == TRANSACTION_STATUS_ABORTED ) {
919+ elog (LOG ,"Abort transaction %s because it is in state %d at node %d" ,
917920msg -> gid ,ts -> status ,node );
918- Assert (!IsTransactionState ());
919- StartTransactionCommand ();
920- MtmSetCurrentTransactionGID (ts -> gid );
921- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
922- FinishPreparedTransaction (ts -> gid , true);
923- CommitTransactionCommand ();
924- Assert (ts -> status == TRANSACTION_STATUS_COMMITTED );
925- }else if (msg -> status == TRANSACTION_STATUS_ABORTED
926- || ((ts -> participantsMask & ~Mtm -> disabledNodeMask )& ~ts -> votedMask )== 0 )
921+ MtmFinishPreparedTransaction (node ,ts , false);
922+ }
923+ else if (msg -> status == TRANSACTION_STATUS_COMMITTED || msg -> status == TRANSACTION_STATUS_UNKNOWN )
927924{
928- if (msg -> status == TRANSACTION_STATUS_ABORTED ) {
929- elog (LOG ,"Abort transaction %s because it is aborted at node %d" ,msg -> gid ,node );
930- }else {
931- elog (LOG ,"Abort transaction %s because it is not prepared at any online node" ,msg -> gid );
925+ if (msg -> csn > ts -> csn ) {
926+ ts -> csn = msg -> csn ;
927+ MtmSyncClock (ts -> csn );
928+ }
929+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
930+ elog (LOG ,"Commit transaction %s because it is prepared at all live nodes" ,msg -> gid );
931+ MtmFinishPreparedTransaction (node ,ts , true);
932932}
933- Assert (!IsTransactionState ());
934- StartTransactionCommand ();
935- MtmSetCurrentTransactionGID (ts -> gid );
936- FinishPreparedTransaction (ts -> gid , false);
937- CommitTransactionCommand ();
938- Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
939933}else {
940934elog (LOG ,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx" ,
941- msg -> status ,msg -> gid ,node , (long long )ts -> votedMask ,
942- (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
935+ msg -> status ,msg -> gid ,node , (long long )ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ));
943936continue ;
944937}
945938}else if (ts -> status == TRANSACTION_STATUS_ABORTED && msg -> status == TRANSACTION_STATUS_COMMITTED ) {
946939elog (WARNING ,"Transaction %s is aborted at node %d but committed at node %d" ,msg -> gid ,MtmNodeId ,node );
947940}else if (msg -> status == TRANSACTION_STATUS_ABORTED && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
948941elog (WARNING ,"Transaction %s is committed at node %d but aborted at node %d" ,msg -> gid ,MtmNodeId ,node );
942+ }else {
943+ elog (LOG ,"Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx" ,
944+ msg -> status ,msg -> gid ,ts -> status ,node , (long long )ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
949945}
950946}
951947continue ;
@@ -961,50 +957,49 @@ static void MtmReceiver(Datum arg)
961957elog (WARNING ,"Ignore response for unexisted transaction %d from node %d" ,msg -> dxid ,node );
962958continue ;
963959}
960+ if (BIT_CHECK (ts -> votedMask ,node - 1 )) {
961+ elog (WARNING ,"Receive deteriorated %s response for transaction %d (%s) from node %d" ,
962+ messageKindText [msg -> code ],ts -> xid ,ts -> gid ,node );
963+ continue ;
964+ }
964965MtmCheckResponse (msg );
965-
966+ BIT_SET (ts -> votedMask ,node - 1 );
967+
966968if (MtmIsCoordinator (ts )) {
967969switch (msg -> code ) {
968- case MSG_READY :
969- MTM_TXTRACE (ts ,"MtmTransReceiver gotMSG_READY " );
970+ case MSG_PREPARED :
971+ MTM_TXTRACE (ts ,"MtmTransReceiver gotMSG_PREPARED " );
970972if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
971- elog (WARNING ,"ReceiveREADY response for already committed transaction %d from node %d" ,
973+ elog (WARNING ,"ReceivePREPARED response for already committed transaction %d from node %d" ,
972974ts -> xid ,node );
973975continue ;
974976}
975- if (ts -> nVotes >=Mtm -> nLiveNodes ) {
976- elog (WARNING ,"Receive deteriorated READY response for transaction %d (%s) from node %d" ,
977- ts -> xid ,ts -> gid ,node );
977+ Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime ()- ts -> csn ;
978+ ts -> xids [node - 1 ]= msg -> sxid ;
979+
980+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask )!= 0 ) {
981+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
982+ commit on smaller subset of nodes */
983+ elog (WARNING ,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
984+ node , (long )Mtm -> disabledNodeMask , (long )msg -> disabledNodeMask );
978985MtmAbortTransaction (ts );
979- MtmWakeUpBackend (ts );
980- }else {
981- Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime ()- ts -> csn ;
982- ts -> xids [node - 1 ]= msg -> sxid ;
983-
984- if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask )!= 0 ) {
985- /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
986- commit on smaller subset of nodes */
987- elog (WARNING ,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
988- node , (long )Mtm -> disabledNodeMask , (long )msg -> disabledNodeMask );
989- MtmAbortTransaction (ts );
990- }
991-
992- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
993- /* All nodes are finished their transactions */
994- if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
995- MtmWakeUpBackend (ts );
986+ }
987+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
988+ /* All nodes are finished their transactions */
989+ if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
990+ MtmWakeUpBackend (ts );
991+ }else {
992+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
993+ ts -> isPrepared = true;
994+ if (ts -> isTwoPhase ) {
995+ MtmWakeUpBackend (ts );
996+ }else if (MtmUseDtm ) {
997+ ts -> votedMask = 0 ;
998+ MTM_TXTRACE (ts ,"MtmTransReceiver send MSG_PRECOMMIT" );
999+ MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
9961000}else {
997- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
998- if (ts -> isTwoPhase ) {
999- MtmWakeUpBackend (ts );
1000- }else if (MtmUseDtm ) {
1001- ts -> nVotes = 1 ;/* I voted myself */
1002- MTM_TXTRACE (ts ,"MtmTransReceiver send MSG_PREPARE" );
1003- MtmSend2PCMessage (ts ,MSG_PREPARE );
1004- }else {
1005- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1006- MtmWakeUpBackend (ts );
1007- }
1001+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1002+ MtmWakeUpBackend (ts );
10081003}
10091004}
10101005}
@@ -1019,47 +1014,40 @@ static void MtmReceiver(Datum arg)
10191014Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
10201015MtmAbortTransaction (ts );
10211016}
1022- if (++ ts -> nVotes >= Mtm -> nLiveNodes ) {
1017+ if (( ts -> participantsMask & ~ Mtm -> disabledNodeMask & ~ ts -> votedMask ) == 0 ) {
10231018MtmWakeUpBackend (ts );
10241019}
10251020break ;
1026- case MSG_PREPARED :
1027- MTM_TXTRACE (ts ,"MtmTransReceiver got MSG_PREPARED" );
1028- if (ts -> nVotes >=Mtm -> nLiveNodes ) {
1029- elog (WARNING ,"Receive deteriorated PREPARED response for transaction %d (%s) from node %d" ,
1030- ts -> xid ,ts -> gid ,node );
1031- MtmAbortTransaction (ts );
1032- MtmWakeUpBackend (ts );
1021+ case MSG_PRECOMMITTED :
1022+ MTM_TXTRACE (ts ,"MtmTransReceiver got MSG_PRECOMMITTED" );
1023+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1024+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1025+ if (msg -> csn > ts -> csn ) {
1026+ ts -> csn = msg -> csn ;
1027+ MtmSyncClock (ts -> csn );
1028+ }
1029+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
1030+ ts -> csn = MtmAssignCSN ();
1031+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1032+ MtmWakeUpBackend (ts );
1033+ }
10331034}else {
1034- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1035- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1036- if (msg -> csn > ts -> csn ) {
1037- ts -> csn = msg -> csn ;
1038- MtmSyncClock (ts -> csn );
1039- }
1040- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1041- ts -> csn = MtmAssignCSN ();
1042- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1043- MtmWakeUpBackend (ts );
1044- }
1045- }else {
1046- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1047- MtmWakeUpBackend (ts );
1048- }
1049- }
1050- }
1035+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
1036+ MtmWakeUpBackend (ts );
1037+ }
1038+ }
10511039break ;
10521040default :
10531041Assert (false);
10541042}
10551043}else {
10561044switch (msg -> code ) {
1057- case MSG_PREPARE :
1045+ case MSG_PRECOMMIT :
10581046if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
10591047ts -> status = TRANSACTION_STATUS_UNKNOWN ;
10601048ts -> csn = MtmAssignCSN ();
10611049MtmAdjustSubtransactions (ts );
1062- MtmSend2PCMessage (ts ,MSG_PREPARED );
1050+ MtmSend2PCMessage (ts ,MSG_PRECOMMITTED );
10631051}else {
10641052Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
10651053MtmSend2PCMessage (ts ,MSG_ABORTED );