@@ -458,7 +458,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
458458MTM_TRACE ("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n" ,
459459messageText [ts -> cmd ],ts -> csn ,node + 1 ,MtmNodeId ,ts -> gtid .xid ,ts -> xid );
460460Assert (ts -> cmd != MSG_INVALID );
461- buf -> data [buf -> used ].code = ts -> cmd ;
461+ buf -> data [buf -> used ].code = ts -> status == TRANSACTION_STATUS_ABORTED ? MSG_ABORTED : MSG_PREPARED ;
462462buf -> data [buf -> used ].dxid = xid ;
463463buf -> data [buf -> used ].sxid = ts -> xid ;
464464buf -> data [buf -> used ].csn = ts -> csn ;
@@ -467,21 +467,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
467467buf -> used += 1 ;
468468}
469469
470- static void MtmBroadcastMessage (MtmBuffer * txBuffer ,MtmTransState * ts )
471- {
472- int i ;
473- int n = 1 ;
474- for (i = 0 ;i < MtmNodes ;i ++ )
475- {
476- if (TransactionIdIsValid (ts -> xids [i ])) {
477- Assert (i + 1 != MtmNodeId );
478- MtmAppendBuffer (txBuffer ,ts -> xids [i ],i ,ts );
479- n += 1 ;
480- }
481- }
482- Assert (n == ds -> nNodes );
483- }
484-
485470static void MtmTransSender (Datum arg )
486471{
487472int nNodes = MtmNodes ;
@@ -508,12 +493,7 @@ static void MtmTransSender(Datum arg)
508493MtmLock (LW_SHARED );
509494
510495for (ts = ds -> votingTransactions ;ts != NULL ;ts = ts -> nextVoting ) {
511- if (MtmIsCoordinator (ts )) {
512- MtmBroadcastMessage (txBuffer ,ts );
513- }else {
514- MtmAppendBuffer (txBuffer ,ts -> gtid .xid ,ts -> gtid .node - 1 ,ts );
515- }
516- ts -> cmd = MSG_INVALID ;
496+ MtmAppendBuffer (txBuffer ,ts -> gtid .xid ,ts -> gtid .node - 1 ,ts );
517497}
518498ds -> votingTransactions = NULL ;
519499MtmUnlock ();
@@ -634,109 +614,31 @@ static void MtmTransReceiver(Datum arg)
634614MtmArbiterMessage * msg = & rxBuffer [i ].data [j ];
635615MtmTransState * ts = (MtmTransState * )hash_search (xid2state ,& msg -> dxid ,HASH_FIND ,NULL );
636616Assert (ts != NULL );
637- Assert (ts -> cmd == MSG_INVALID );
638617Assert (msg -> node > 0 && msg -> node <=nNodes && msg -> node != MtmNodeId );
639- ts -> xids [msg -> node - 1 ]= msg -> sxid ;
640-
641- if (MtmIsCoordinator (ts )) {
642- switch (msg -> code ) {
643- case MSG_READY :
644- Assert (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
645- Assert (ts -> nVotes < ds -> nNodes );
646- ds -> nodeTransDelay [msg -> node - 1 ]+= MtmGetCurrentTime ()- ts -> csn ;
647- if (++ ts -> nVotes == ds -> nNodes ) {
648- /* All nodes are finished their transactions */
649- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
650- ts -> nVotes = 1 ;/* I voted myself */
651- ts -> cmd = MSG_PREPARE ;
652- }else {
653- ts -> status = TRANSACTION_STATUS_ABORTED ;
654- ts -> cmd = MSG_ABORT ;
655- MtmAdjustSubtransactions (ts );
656- MtmWakeUpBackend (ts );
657- }
658- MtmSendNotificationMessage (ts );
659- }
660- break ;
618+ Assert (MtmIsCoordinator (ts ));
619+ switch (msg -> code ) {
661620case MSG_PREPARED :
662- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
663- Assert (ts -> nVotes < ds -> nNodes );
664- if (msg -> csn > ts -> csn ) {
665- ts -> csn = msg -> csn ;
666- MtmSyncClock (ts -> csn );
667- }
668- if (++ ts -> nVotes == ds -> nNodes ) {
669- /* ts->csn is maximum of CSNs at all nodes */
670- ts -> nVotes = 1 ;/* I voted myself */
671- ts -> cmd = MSG_COMMIT ;
672- ts -> csn = MtmAssignCSN ();
673- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
674- MtmAdjustSubtransactions (ts );
675- MtmSendNotificationMessage (ts );
676- }
677- break ;
678- case MSG_COMMITTED :
679- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
680- Assert (ts -> nVotes < ds -> nNodes );
681- if (++ ts -> nVotes == ds -> nNodes ) {
682- /* All nodes have the same CSN */
683- MtmWakeUpBackend (ts );
621+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
622+ if (msg -> csn > ts -> csn ) {
623+ ts -> csn = msg -> csn ;
624+ MtmSyncClock (ts -> csn );
625+ }
626+ if (++ ts -> nVotes == ds -> nNodes ) {
627+ MtmWakeUpBackend (ts );
628+ }
684629}
685630break ;
686631case MSG_ABORTED :
687- Assert (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
688- Assert (ts -> nVotes < ds -> nNodes );
689- ts -> status = TRANSACTION_STATUS_ABORTED ;
690- if (++ ts -> nVotes == ds -> nNodes ) {
691- ts -> cmd = MSG_ABORT ;
692- MtmAdjustSubtransactions (ts );
693- MtmSendNotificationMessage (ts );
694- MtmWakeUpBackend (ts );
695- }
696- break ;
697- default :
698- Assert (false);
699- }
700- }else {/* replica */
701- switch (msg -> code ) {
702- case MSG_PREPARE :
703- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
704- if ((msg -> disabledNodeMask & ~ds -> disabledNodeMask )!= 0 ) {
705- /* Coordinator's disabled mask is wider than my: so reject such transaction to avoid
706- commit on smaller subset of nodes */
632+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
707633ts -> status = TRANSACTION_STATUS_ABORTED ;
708- ts -> cmd = MSG_ABORT ;
709- MtmAdjustSubtransactions (ts );
710- MtmWakeUpBackend (ts );
711- }else {
712- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
713- ts -> csn = MtmAssignCSN ();
714- ts -> cmd = MSG_PREPARED ;
715- }
716- MtmSendNotificationMessage (ts );
717- break ;
718- case MSG_COMMIT :
719- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
720- Assert (ts -> csn < msg -> csn );
721- ts -> csn = msg -> csn ;
722- MtmSyncClock (ts -> csn );
723- ts -> cmd = MSG_COMMITTED ;
724- MtmAdjustSubtransactions (ts );
725- MtmSendNotificationMessage (ts );
726- MtmWakeUpBackend (ts );
727- break ;
728- case MSG_ABORT :
729- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
730- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
731- ts -> status = TRANSACTION_STATUS_ABORTED ;
732634MtmAdjustSubtransactions (ts );
733635MtmWakeUpBackend (ts );
734636}
735637break ;
736638default :
737639Assert (false);
738640}
739- }
641+ }
740642}
741643MtmUnlock ();
742644