@@ -106,10 +106,7 @@ static char const* const messageText[] =
106106"HANDSHAKE" ,
107107"READY" ,
108108"PREPARE" ,
109- "COMMIT" ,
110- "ABORT" ,
111109"PREPARED" ,
112- "COMMITTED" ,
113110"ABORTED" ,
114111"STATUS"
115112};
@@ -456,8 +453,10 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
456453buf -> used = 0 ;
457454}
458455MTM_TRACE ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n" ,
459- ts -> status == TRANSACTION_STATUS_ABORTED ?"abort" :"commit" ,ts -> csn ,node + 1 ,MtmNodeId ,ts -> gtid .xid ,ts -> xid );
460- buf -> data [buf -> used ].code = ts -> status == TRANSACTION_STATUS_ABORTED ?MSG_ABORTED :MSG_PREPARED ;
456+ messageText [ts -> cmd ],ts -> csn ,node + 1 ,MtmNodeId ,ts -> gtid .xid ,ts -> xid );
457+
458+ Assert (ts -> cmd != MSG_INVALID );
459+ buf -> data [buf -> used ].code = ts -> cmd ;
461460buf -> data [buf -> used ].dxid = xid ;
462461buf -> data [buf -> used ].sxid = ts -> xid ;
463462buf -> data [buf -> used ].csn = ts -> csn ;
@@ -466,6 +465,22 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
466465buf -> used += 1 ;
467466}
468467
468+ static void MtmBroadcastMessage (MtmBuffer * txBuffer ,MtmTransState * ts )
469+ {
470+ int i ;
471+ int n = 1 ;
472+ for (i = 0 ;i < MtmNodes ;i ++ )
473+ {
474+ if (TransactionIdIsValid (ts -> xids [i ])) {
475+ Assert (i + 1 != MtmNodeId );
476+ MtmAppendBuffer (txBuffer ,ts -> xids [i ],i ,ts );
477+ n += 1 ;
478+ }
479+ }
480+ Assert (n == ds -> nNodes );
481+ }
482+
483+
469484static void MtmTransSender (Datum arg )
470485{
471486int nNodes = MtmNodes ;
@@ -492,7 +507,11 @@ static void MtmTransSender(Datum arg)
492507MtmLock (LW_SHARED );
493508
494509for (ts = ds -> votingTransactions ;ts != NULL ;ts = ts -> nextVoting ) {
495- MtmAppendBuffer (txBuffer ,ts -> gtid .xid ,ts -> gtid .node - 1 ,ts );
510+ if (MtmIsCoordinator (ts )) {
511+ MtmBroadcastMessage (txBuffer ,ts );
512+ }else {
513+ MtmAppendBuffer (txBuffer ,ts -> gtid .xid ,ts -> gtid .node - 1 ,ts );
514+ }
496515}
497516ds -> votingTransactions = NULL ;
498517
@@ -510,6 +529,7 @@ static void MtmTransSender(Datum arg)
510529static void MtmWakeUpBackend (MtmTransState * ts )
511530{
512531MTM_TRACE ("Wakeup backed procno=%d, pid=%d\n" ,ts -> procno ,ProcGlobal -> allProcs [ts -> procno ].pid );
532+ ts -> votingCompleted = true;
513533SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
514534}
515535
@@ -565,6 +585,9 @@ static void MtmTransReceiver(Datum arg)
565585#if USE_EPOLL
566586n = epoll_wait (epollfd ,events ,nNodes ,MtmKeepaliveTimeout /1000 );
567587if (n < 0 ) {
588+ if (errno == EINTR ) {
589+ continue ;
590+ }
568591elog (ERROR ,"Arbiter failed to poll sockets: %d" ,errno );
569592}
570593for (j = 0 ;j < n ;j ++ ) {
@@ -581,7 +604,9 @@ static void MtmTransReceiver(Datum arg)
581604events = inset ;
582605tv .tv_sec = MtmKeepaliveTimeout /USEC ;
583606tv .tv_usec = MtmKeepaliveTimeout %USEC ;
584- n = select (max_fd + 1 ,& events ,NULL ,NULL ,& tv );
607+ do {
608+ n = select (max_fd + 1 ,& events ,NULL ,NULL ,& tv );
609+ }while (n < 0 && errno == ENINTR );
585610}while (n < 0 && MtmRecovery ());
586611
587612if (rc < 0 ) {
@@ -612,31 +637,62 @@ static void MtmTransReceiver(Datum arg)
612637MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State ,& msg -> dxid ,HASH_FIND ,NULL );
613638Assert (ts != NULL );
614639Assert (msg -> node > 0 && msg -> node <=nNodes && msg -> node != MtmNodeId );
615- Assert (MtmIsCoordinator (ts ));
616- switch (msg -> code ) {
617- case MSG_PREPARED :
618- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
619- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS || ts -> status == TRANSACTION_STATUS_UNKNOWN );
620- if (msg -> csn > ts -> csn ) {
621- ts -> csn = msg -> csn ;
622- MtmSyncClock (ts -> csn );
623- }
624- if (++ ts -> nVotes == ds -> nNodes ) {
625- MtmWakeUpBackend (ts );
640+ if (MtmIsCoordinator (ts )) {
641+ switch (msg -> code ) {
642+ case MSG_READY :
643+ Assert (ts -> nVotes < ds -> nNodes );
644+ ds -> nodeTransDelay [msg -> node - 1 ]+= MtmGetCurrentTime ()- ts -> csn ;
645+ ts -> xids [msg -> node - 1 ]= msg -> sxid ;
646+ if (++ ts -> nVotes == ds -> nNodes ) {
647+ /* All nodes are finished their transactions */
648+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
649+ ts -> nVotes = 1 ;/* I voted myself */
650+ MtmSendNotificationMessage (ts ,MSG_PREPARE );
651+ }else {
652+ Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
653+ MtmWakeUpBackend (ts );
626654}
627655}
628- break ;
629- case MSG_ABORTED :
656+ break ;
657+ case MSG_ABORTED :
658+ Assert (ts -> nVotes < ds -> nNodes );
630659if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
631- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS || ts -> status == TRANSACTION_STATUS_UNKNOWN );
660+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
632661ts -> status = TRANSACTION_STATUS_ABORTED ;
633662MtmAdjustSubtransactions (ts );
663+ }
664+ if (++ ts -> nVotes == ds -> nNodes ) {
634665MtmWakeUpBackend (ts );
635666}
636667break ;
637- default :
668+ case MSG_PREPARED :
669+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
670+ Assert (ts -> nVotes < ds -> nNodes );
671+ if (msg -> csn > ts -> csn ) {
672+ ts -> csn = msg -> csn ;
673+ MtmSyncClock (ts -> csn );
674+ }
675+ if (++ ts -> nVotes == ds -> nNodes ) {
676+ ts -> csn = MtmAssignCSN ();
677+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
678+ MtmWakeUpBackend (ts );
679+ }
680+ default :
681+ Assert (false);
682+ }
683+ }else {
684+ switch (msg -> code ) {
685+ case MSG_PREPARE :
686+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
687+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
688+ ts -> csn = MtmAssignCSN ();
689+ MtmAdjustSubtransactions (ts );
690+ MtmSendNotificationMessage (ts ,MSG_PREPARED );
691+ break ;
692+ default :
638693Assert (false);
639- }
694+ }
695+ }
640696}
641697MtmUnlock ();
642698