@@ -91,21 +91,19 @@ static void MtmMonitor(Datum arg);
91
91
static void MtmSendHeartbeat (void );
92
92
static bool MtmSendToNode (int node ,void const * buf ,int size );
93
93
94
- /*
95
- static char const* const messageText[] =
94
+ static char const * const messageKindText []=
96
95
{
97
96
"INVALID" ,
98
97
"HANDSHAKE" ,
99
- "READY",
100
- "PREPARE",
101
98
"PREPARED" ,
99
+ "PRECOMMIT" ,
100
+ "PRECOMMITTED" ,
102
101
"ABORTED" ,
103
102
"STATUS" ,
104
103
"HEARTBEAT" ,
105
104
"POLL_REQUEST" ,
106
105
"POLL_STATUS"
107
106
};
108
- */
109
107
110
108
static BackgroundWorker MtmSenderWorker = {
111
109
"mtm-sender" ,
@@ -364,7 +362,7 @@ static void MtmSendHeartbeat()
364
362
MTM_LOG2 ("Send heartbeat to node %d with timestamp %ld" ,i + 1 ,now );
365
363
}
366
364
}else {
367
- MTM_LOG1 ("Do not send heartbeat to node %d, busy mask %lld, status %d" ,i + 1 , (long long )busy_mask ,Mtm -> status );
365
+ MTM_LOG2 ("Do not send heartbeat to node %d, busy mask %lld, status %d" ,i + 1 , (long long )busy_mask ,Mtm -> status );
368
366
}
369
367
}
370
368
}
@@ -898,9 +896,14 @@ static void MtmReceiver(Datum arg)
898
896
msg -> status = TRANSACTION_STATUS_ABORTED ;
899
897
}else {
900
898
msg -> status = tm -> state -> status ;
899
+ msg -> csn = tm -> state -> csn ;
901
900
MTM_LOG1 ("Send response %d for transaction %s to node %d" ,msg -> status ,msg -> gid ,msg -> node );
902
901
}
903
- msg -> code = MSG_POLL_STATUS ;
902
+ msg -> disabledNodeMask = Mtm -> disabledNodeMask ;
903
+ msg -> connectivityMask = Mtm -> connectivityMask ;
904
+ msg -> oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
905
+ msg -> code = MSG_POLL_STATUS ;
906
+ msg -> csn = ts -> csn ;
904
907
MtmSendMessage (msg );
905
908
continue ;
906
909
case MSG_POLL_STATUS :
@@ -911,41 +914,34 @@ static void MtmReceiver(Datum arg)
911
914
}else {
912
915
ts = tm -> state ;
913
916
BIT_SET (ts -> votedMask ,node - 1 );
914
- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
915
- if (msg -> status == TRANSACTION_STATUS_UNKNOWN || msg -> status == TRANSACTION_STATUS_COMMITTED ) {
916
- elog (LOG ,"Commit transaction %s because it is in state %d at node %d" ,
917
+ if (ts -> status == TRANSACTION_STATUS_UNKNOWN ) {
918
+ if (msg -> status == TRANSACTION_STATUS_IN_PROGRESS || msg -> status == TRANSACTION_STATUS_ABORTED ) {
919
+ elog (LOG ,"Abort transaction %s because it is in state %d at node %d" ,
917
920
msg -> gid ,ts -> status ,node );
918
- Assert (!IsTransactionState ());
919
- StartTransactionCommand ();
920
- MtmSetCurrentTransactionGID (ts -> gid );
921
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
922
- FinishPreparedTransaction (ts -> gid , true);
923
- CommitTransactionCommand ();
924
- Assert (ts -> status == TRANSACTION_STATUS_COMMITTED );
925
- }else if (msg -> status == TRANSACTION_STATUS_ABORTED
926
- || ((ts -> participantsMask & ~Mtm -> disabledNodeMask )& ~ts -> votedMask )== 0 )
921
+ MtmFinishPreparedTransaction (node ,ts , false);
922
+ }
923
+ else if (msg -> status == TRANSACTION_STATUS_COMMITTED || msg -> status == TRANSACTION_STATUS_UNKNOWN )
927
924
{
928
- if (msg -> status == TRANSACTION_STATUS_ABORTED ) {
929
- elog (LOG ,"Abort transaction %s because it is aborted at node %d" ,msg -> gid ,node );
930
- }else {
931
- elog (LOG ,"Abort transaction %s because it is not prepared at any online node" ,msg -> gid );
925
+ if (msg -> csn > ts -> csn ) {
926
+ ts -> csn = msg -> csn ;
927
+ MtmSyncClock (ts -> csn );
928
+ }
929
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
930
+ elog (LOG ,"Commit transaction %s because it is prepared at all live nodes" ,msg -> gid );
931
+ MtmFinishPreparedTransaction (node ,ts , true);
932
932
}
933
- Assert (!IsTransactionState ());
934
- StartTransactionCommand ();
935
- MtmSetCurrentTransactionGID (ts -> gid );
936
- FinishPreparedTransaction (ts -> gid , false);
937
- CommitTransactionCommand ();
938
- Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
939
933
}else {
940
934
elog (LOG ,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx" ,
941
- msg -> status ,msg -> gid ,node , (long long )ts -> votedMask ,
942
- (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
935
+ msg -> status ,msg -> gid ,node , (long long )ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ));
943
936
continue ;
944
937
}
945
938
}else if (ts -> status == TRANSACTION_STATUS_ABORTED && msg -> status == TRANSACTION_STATUS_COMMITTED ) {
946
939
elog (WARNING ,"Transaction %s is aborted at node %d but committed at node %d" ,msg -> gid ,MtmNodeId ,node );
947
940
}else if (msg -> status == TRANSACTION_STATUS_ABORTED && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
948
941
elog (WARNING ,"Transaction %s is committed at node %d but aborted at node %d" ,msg -> gid ,MtmNodeId ,node );
942
+ }else {
943
+ elog (LOG ,"Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx" ,
944
+ msg -> status ,msg -> gid ,ts -> status ,node , (long long )ts -> votedMask , (long long ) (ts -> participantsMask & ~Mtm -> disabledNodeMask ) );
949
945
}
950
946
}
951
947
continue ;
@@ -961,50 +957,49 @@ static void MtmReceiver(Datum arg)
961
957
elog (WARNING ,"Ignore response for unexisted transaction %d from node %d" ,msg -> dxid ,node );
962
958
continue ;
963
959
}
960
+ if (BIT_CHECK (ts -> votedMask ,node - 1 )) {
961
+ elog (WARNING ,"Receive deteriorated %s response for transaction %d (%s) from node %d" ,
962
+ messageKindText [msg -> code ],ts -> xid ,ts -> gid ,node );
963
+ continue ;
964
+ }
964
965
MtmCheckResponse (msg );
965
-
966
+ BIT_SET (ts -> votedMask ,node - 1 );
967
+
966
968
if (MtmIsCoordinator (ts )) {
967
969
switch (msg -> code ) {
968
- case MSG_READY :
969
- MTM_TXTRACE (ts ,"MtmTransReceiver gotMSG_READY " );
970
+ case MSG_PREPARED :
971
+ MTM_TXTRACE (ts ,"MtmTransReceiver gotMSG_PREPARED " );
970
972
if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
971
- elog (WARNING ,"ReceiveREADY response for already committed transaction %d from node %d" ,
973
+ elog (WARNING ,"ReceivePREPARED response for already committed transaction %d from node %d" ,
972
974
ts -> xid ,node );
973
975
continue ;
974
976
}
975
- if (ts -> nVotes >=Mtm -> nLiveNodes ) {
976
- elog (WARNING ,"Receive deteriorated READY response for transaction %d (%s) from node %d" ,
977
- ts -> xid ,ts -> gid ,node );
977
+ Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime ()- ts -> csn ;
978
+ ts -> xids [node - 1 ]= msg -> sxid ;
979
+
980
+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask )!= 0 ) {
981
+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
982
+ commit on smaller subset of nodes */
983
+ elog (WARNING ,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
984
+ node , (long )Mtm -> disabledNodeMask , (long )msg -> disabledNodeMask );
978
985
MtmAbortTransaction (ts );
979
- MtmWakeUpBackend (ts );
980
- }else {
981
- Mtm -> nodes [node - 1 ].transDelay += MtmGetCurrentTime ()- ts -> csn ;
982
- ts -> xids [node - 1 ]= msg -> sxid ;
983
-
984
- if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask )!= 0 ) {
985
- /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
986
- commit on smaller subset of nodes */
987
- elog (WARNING ,"Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
988
- node , (long )Mtm -> disabledNodeMask , (long )msg -> disabledNodeMask );
989
- MtmAbortTransaction (ts );
990
- }
991
-
992
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
993
- /* All nodes are finished their transactions */
994
- if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
995
- MtmWakeUpBackend (ts );
986
+ }
987
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
988
+ /* All nodes are finished their transactions */
989
+ if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
990
+ MtmWakeUpBackend (ts );
991
+ }else {
992
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
993
+ ts -> isPrepared = true;
994
+ if (ts -> isTwoPhase ) {
995
+ MtmWakeUpBackend (ts );
996
+ }else if (MtmUseDtm ) {
997
+ ts -> votedMask = 0 ;
998
+ MTM_TXTRACE (ts ,"MtmTransReceiver send MSG_PRECOMMIT" );
999
+ MtmSend2PCMessage (ts ,MSG_PRECOMMIT );
996
1000
}else {
997
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
998
- if (ts -> isTwoPhase ) {
999
- MtmWakeUpBackend (ts );
1000
- }else if (MtmUseDtm ) {
1001
- ts -> nVotes = 1 ;/* I voted myself */
1002
- MTM_TXTRACE (ts ,"MtmTransReceiver send MSG_PREPARE" );
1003
- MtmSend2PCMessage (ts ,MSG_PREPARE );
1004
- }else {
1005
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1006
- MtmWakeUpBackend (ts );
1007
- }
1001
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1002
+ MtmWakeUpBackend (ts );
1008
1003
}
1009
1004
}
1010
1005
}
@@ -1019,47 +1014,40 @@ static void MtmReceiver(Datum arg)
1019
1014
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1020
1015
MtmAbortTransaction (ts );
1021
1016
}
1022
- if (++ ts -> nVotes >= Mtm -> nLiveNodes ) {
1017
+ if (( ts -> participantsMask & ~ Mtm -> disabledNodeMask & ~ ts -> votedMask ) == 0 ) {
1023
1018
MtmWakeUpBackend (ts );
1024
1019
}
1025
1020
break ;
1026
- case MSG_PREPARED :
1027
- MTM_TXTRACE (ts ,"MtmTransReceiver got MSG_PREPARED" );
1028
- if (ts -> nVotes >=Mtm -> nLiveNodes ) {
1029
- elog (WARNING ,"Receive deteriorated PREPARED response for transaction %d (%s) from node %d" ,
1030
- ts -> xid ,ts -> gid ,node );
1031
- MtmAbortTransaction (ts );
1032
- MtmWakeUpBackend (ts );
1021
+ case MSG_PRECOMMITTED :
1022
+ MTM_TXTRACE (ts ,"MtmTransReceiver got MSG_PRECOMMITTED" );
1023
+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1024
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1025
+ if (msg -> csn > ts -> csn ) {
1026
+ ts -> csn = msg -> csn ;
1027
+ MtmSyncClock (ts -> csn );
1028
+ }
1029
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
1030
+ ts -> csn = MtmAssignCSN ();
1031
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1032
+ MtmWakeUpBackend (ts );
1033
+ }
1033
1034
}else {
1034
- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1035
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1036
- if (msg -> csn > ts -> csn ) {
1037
- ts -> csn = msg -> csn ;
1038
- MtmSyncClock (ts -> csn );
1039
- }
1040
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1041
- ts -> csn = MtmAssignCSN ();
1042
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1043
- MtmWakeUpBackend (ts );
1044
- }
1045
- }else {
1046
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
1047
- MtmWakeUpBackend (ts );
1048
- }
1049
- }
1050
- }
1035
+ if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
1036
+ MtmWakeUpBackend (ts );
1037
+ }
1038
+ }
1051
1039
break ;
1052
1040
default :
1053
1041
Assert (false);
1054
1042
}
1055
1043
}else {
1056
1044
switch (msg -> code ) {
1057
- case MSG_PREPARE :
1045
+ case MSG_PRECOMMIT :
1058
1046
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
1059
1047
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1060
1048
ts -> csn = MtmAssignCSN ();
1061
1049
MtmAdjustSubtransactions (ts );
1062
- MtmSend2PCMessage (ts ,MSG_PREPARED );
1050
+ MtmSend2PCMessage (ts ,MSG_PRECOMMITTED );
1063
1051
}else {
1064
1052
Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
1065
1053
MtmSend2PCMessage (ts ,MSG_ABORTED );