@@ -82,20 +82,25 @@ typedef struct
8282int node ;/* Sender node ID */
8383TransactionId dxid ;/* Transaction ID at destination node */
8484TransactionId sxid ;/* Transaction ID at sender node */
85- csn_t csn ;/* local CSN in case of sending data from replica to master, global CSN master->replica */
86- nodemask_t disabledNodeMask ;/* bitmask of disabled nodes at the sender of message */
85+ csn_t csn ;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
86+ nodemask_t disabledNodeMask ;/* Bitmask of disabled nodes at the sender of message */
87+ csn_t oldestSnapshot ;/* Oldest snapshot used by active transactions at this node */
8788}MtmArbiterMessage ;
8889
90+ typedef struct
91+ {
92+ MtmArbiterMessage hdr ;
93+ char connStr [MULTIMASTER_MAX_CONN_STR_SIZE ];
94+ }MtmHandshakeMessage ;
95+
8996typedef struct
9097{
9198int used ;
9299MtmArbiterMessage data [BUFFER_SIZE ];
93100}MtmBuffer ;
94101
95102static int * sockets ;
96- static char * * hosts ;
97103static int gateway ;
98- static MtmState * ds ;
99104
100105static void MtmTransSender (Datum arg );
101106static void MtmTransReceiver (Datum arg );
@@ -266,39 +271,41 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
266271continue ;
267272}else {
268273int optval = 1 ;
269- MtmArbiterMessage msg ;
274+ MtmHandshakeMessage req ;
275+ MtmArbiterMessage resp ;
270276setsockopt (sd ,IPPROTO_TCP ,TCP_NODELAY , (char const * )& optval ,sizeof (optval ));
271277setsockopt (sd ,SOL_SOCKET ,SO_KEEPALIVE , (char const * )& optval ,sizeof (optval ));
272278
273- msg .code = MSG_HANDSHAKE ;
274- msg .node = MtmNodeId ;
275- msg .dxid = HANDSHAKE_MAGIC ;
276- msg .sxid = ShmemVariableCache -> nextXid ;
277- msg .csn = MtmGetCurrentTime ();
278- msg .disabledNodeMask = ds -> disabledNodeMask ;
279- if (!MtmWriteSocket (sd ,& msg ,sizeof msg )) {
279+ req .hdr .code = MSG_HANDSHAKE ;
280+ req .hdr .node = MtmNodeId ;
281+ req .hdr .dxid = HANDSHAKE_MAGIC ;
282+ req .hdr .sxid = ShmemVariableCache -> nextXid ;
283+ req .hdr .csn = MtmGetCurrentTime ();
284+ req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
285+ strcpy (req .connStr ,Mtm -> nodes [MtmNodeId - 1 ].connStr );
286+ if (!MtmWriteSocket (sd ,& req ,sizeof req )) {
280287elog (WARNING ,"Arbiter failed to send handshake message to %s:%d: %d" ,host ,port ,errno );
281288close (sd );
282289gotoRetry ;
283290}
284- if (MtmReadSocket (sd ,& msg ,sizeof msg )!= sizeof (msg )) {
291+ if (MtmReadSocket (sd ,& resp ,sizeof resp )!= sizeof (resp )) {
285292elog (WARNING ,"Arbiter failed to receive response for handshake message from %s:%d: errno=%d" ,host ,port ,errno );
286293close (sd );
287294gotoRetry ;
288295}
289- if (msg .code != MSG_STATUS || msg .dxid != HANDSHAKE_MAGIC ) {
290- elog (WARNING ,"Arbiter get unexpected response %d for handshake message from %s:%d" ,msg .code ,host ,port );
296+ if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
297+ elog (WARNING ,"Arbiter get unexpected response %d for handshake message from %s:%d" ,resp .code ,host ,port );
291298close (sd );
292299gotoRetry ;
293300}
294301
295302/* Some node considered that I am dead, so switch to recovery mode */
296- if (BIT_CHECK (msg .disabledNodeMask ,MtmNodeId - 1 )) {
297- elog (WARNING ,"Node %d think that I am dead" ,msg .node );
303+ if (BIT_CHECK (resp .disabledNodeMask ,MtmNodeId - 1 )) {
304+ elog (WARNING ,"Node %d think that I am dead" ,resp .node );
298305MtmSwitchClusterMode (MTM_RECOVERY );
299306}
300307/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
301- ds -> disabledNodeMask |=msg .disabledNodeMask ;
308+ Mtm -> disabledNodeMask |=resp .disabledNodeMask ;
302309return sd ;
303310}
304311 }
@@ -309,39 +316,23 @@ static void MtmOpenConnections()
309316{
310317int nNodes = MtmNodes ;
311318int i ;
312- char * connStr = pstrdup (MtmConnStrs );
313319
314320sockets = (int * )palloc (sizeof (int )* nNodes );
315- hosts = (char * * )palloc (sizeof (char * )* nNodes );
316321
317322for (i = 0 ;i < nNodes ;i ++ ) {
318- char * host = strstr (connStr ,"host=" );
319- char * end ;
320- if (host == NULL ) {
321- elog (ERROR ,"Invalid connection string: '%s'" ,MtmConnStrs );
322- }
323- host += 5 ;
324- for (end = host ;* end != ' ' && * end != ',' && * end != '\0' ;end ++ );
325- if (* end != '\0' ) {
326- * end = '\0' ;
327- connStr = end + 1 ;
328- }else {
329- connStr = end ;
330- }
331- hosts [i ]= host ;
332323if (i + 1 != MtmNodeId ) {
333- sockets [i ]= MtmConnectSocket (host ,MtmArbiterPort + i + 1 ,MtmConnectAttempts );
324+ sockets [i ]= MtmConnectSocket (Mtm -> nodes [ i ]. hostName ,MtmArbiterPort + i + 1 ,MtmConnectAttempts );
334325if (sockets [i ]< 0 ) {
335326MtmOnNodeDisconnect (i + 1 );
336327}
337328}else {
338329sockets [i ]= -1 ;
339330}
340331}
341- if (ds -> nNodes < MtmNodes /2 + 1 ) {/* no quorum */
342- elog (WARNING ,"Node is out of quorum: only %d nodes from %d are accssible" ,ds -> nNodes ,MtmNodes );
343- ds -> status = MTM_OFFLINE ;
344- }else if (ds -> status == MTM_INITIALIZATION ) {
332+ if (Mtm -> nNodes < MtmNodes /2 + 1 ) {/* no quorum */
333+ elog (WARNING ,"Node is out of quorum: only %d nodes from %d are accssible" ,Mtm -> nNodes ,MtmNodes );
334+ Mtm -> status = MTM_OFFLINE ;
335+ }else if (Mtm -> status == MTM_INITIALIZATION ) {
345336MtmSwitchClusterMode (MTM_CONNECTED );
346337}
347338}
@@ -354,7 +345,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
354345if (sockets [node ] >=0 ) {
355346close (sockets [node ]);
356347}
357- sockets [node ]= MtmConnectSocket (hosts [node ],MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
348+ sockets [node ]= MtmConnectSocket (Mtm -> nodes [node ]. hostName ,MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
358349if (sockets [node ]< 0 ) {
359350MtmOnNodeDisconnect (node + 1 );
360351return false;
@@ -379,29 +370,31 @@ static void MtmAcceptOneConnection()
379370if (fd < 0 ) {
380371elog (WARNING ,"Arbiter failed to accept socket: %d" ,errno );
381372}else {
382- MtmArbiterMessage msg ;
383- int rc = MtmReadSocket (fd ,& msg ,sizeof msg );
384- if (rc < sizeof (msg )) {
373+ MtmHandshakeMessage req ;
374+ MtmArbiterMessage resp ;
375+ int rc = MtmReadSocket (fd ,& req ,sizeof req );
376+ if (rc < sizeof (req )) {
385377elog (WARNING ,"Arbiter failed to handshake socket: %d, errno=%d" ,rc ,errno );
386- }else if (msg . code != MSG_HANDSHAKE && msg .dxid != HANDSHAKE_MAGIC ) {
387- elog (WARNING ,"Arbiter get unexpected handshake message %d" ,msg .code );
378+ }else if (req . hdr . code != MSG_HANDSHAKE && req . hdr .dxid != HANDSHAKE_MAGIC ) {
379+ elog (WARNING ,"Arbiter get unexpected handshake message %d" ,req . hdr .code );
388380close (fd );
389381}else {
390- Assert (msg .node > 0 && msg .node <=MtmNodes && msg .node != MtmNodeId );
391- msg .code = MSG_STATUS ;
392- msg .disabledNodeMask = ds -> disabledNodeMask ;
393- msg .dxid = HANDSHAKE_MAGIC ;
394- msg .sxid = ShmemVariableCache -> nextXid ;
395- msg .csn = MtmGetCurrentTime ();
396- if (!MtmWriteSocket (fd ,& msg ,sizeof msg )) {
397- elog (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,msg .node );
382+ Assert (req .hdr .node > 0 && req .hdr .node <=MtmNodes && req .hdr .node != MtmNodeId );
383+ resp .code = MSG_STATUS ;
384+ resp .disabledNodeMask = Mtm -> disabledNodeMask ;
385+ resp .dxid = HANDSHAKE_MAGIC ;
386+ resp .sxid = ShmemVariableCache -> nextXid ;
387+ resp .csn = MtmGetCurrentTime ();
388+ MtmUpdateNodeConnStr (req .hdr .node ,req .connStr );
389+ if (!MtmWriteSocket (fd ,& resp ,sizeof resp )) {
390+ elog (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,resp .node );
398391close (fd );
399392}else {
400- elog (NOTICE ,"Arbiter established connection with node %d" ,msg .node );
401- BIT_CLEAR (ds -> connectivityMask ,msg .node - 1 );
402- MtmRegisterSocket (fd ,msg .node - 1 );
403- sockets [msg .node - 1 ]= fd ;
404- MtmOnNodeConnect (msg .node );
393+ elog (NOTICE ,"Arbiter established connection with node %d" ,req . hdr .node );
394+ BIT_CLEAR (Mtm -> connectivityMask ,req . hdr .node - 1 );
395+ MtmRegisterSocket (fd ,req . hdr .node - 1 );
396+ sockets [req . hdr .node - 1 ]= fd ;
397+ MtmOnNodeConnect (req . hdr .node );
405398}
406399}
407400}
@@ -415,7 +408,9 @@ static void MtmAcceptIncomingConnections()
415408int i ;
416409
417410sockets = (int * )palloc (sizeof (int )* MtmNodes );
418-
411+ for (i = 0 ;i < MtmNodes ;i ++ ) {
412+ sockets [i ]= -1 ;
413+ }
419414sock_inet .sin_family = AF_INET ;
420415sock_inet .sin_addr .s_addr = htonl (INADDR_ANY );
421416sock_inet .sin_port = htons (MtmArbiterPort + MtmNodeId );
@@ -461,7 +456,8 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
461456buf -> data [buf -> used ].sxid = ts -> xid ;
462457buf -> data [buf -> used ].csn = ts -> csn ;
463458buf -> data [buf -> used ].node = MtmNodeId ;
464- buf -> data [buf -> used ].disabledNodeMask = ds -> disabledNodeMask ;
459+ buf -> data [buf -> used ].disabledNodeMask = Mtm -> disabledNodeMask ;
460+ buf -> data [buf -> used ].oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
465461buf -> used += 1 ;
466462}
467463
@@ -477,7 +473,7 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
477473n += 1 ;
478474}
479475}
480- Assert (n == ds -> nNodes );
476+ Assert (n == Mtm -> nNodes );
481477}
482478
483479
@@ -487,8 +483,6 @@ static void MtmTransSender(Datum arg)
487483int i ;
488484MtmBuffer * txBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
489485
490- ds = MtmGetState ();
491-
492486MtmOpenConnections ();
493487
494488for (i = 0 ;i < nNodes ;i ++ ) {
@@ -497,7 +491,7 @@ static void MtmTransSender(Datum arg)
497491
498492while (true) {
499493MtmTransState * ts ;
500- PGSemaphoreLock (& ds -> votingSemaphore );
494+ PGSemaphoreLock (& Mtm -> votingSemaphore );
501495CHECK_FOR_INTERRUPTS ();
502496
503497/*
@@ -506,14 +500,14 @@ static void MtmTransSender(Datum arg)
506500 */
507501MtmLock (LW_SHARED );
508502
509- for (ts = ds -> votingTransactions ;ts != NULL ;ts = ts -> nextVoting ) {
503+ for (ts = Mtm -> votingTransactions ;ts != NULL ;ts = ts -> nextVoting ) {
510504if (MtmIsCoordinator (ts )) {
511505MtmBroadcastMessage (txBuffer ,ts );
512506}else {
513507MtmAppendBuffer (txBuffer ,ts -> gtid .xid ,ts -> gtid .node - 1 ,ts );
514508}
515509}
516- ds -> votingTransactions = NULL ;
510+ Mtm -> votingTransactions = NULL ;
517511
518512MtmUnlock ();
519513
@@ -573,8 +567,6 @@ static void MtmTransReceiver(Datum arg)
573567max_fd = 0 ;
574568#endif
575569
576- ds = MtmGetState ();
577-
578570MtmAcceptIncomingConnections ();
579571
580572for (i = 0 ;i < nNodes ;i ++ ) {
@@ -613,7 +605,7 @@ static void MtmTransReceiver(Datum arg)
613605elog (ERROR ,"Arbiter failed to select sockets: %d" ,errno );
614606}
615607for (i = 0 ;i < nNodes ;i ++ ) {
616- if (FD_ISSET (sockets [i ],& events ))
608+ if (sockets [ i ] >= 0 && FD_ISSET (sockets [i ],& events ))
617609#endif
618610{
619611if (i + 1 == MtmNodeId ) {
@@ -637,13 +629,24 @@ static void MtmTransReceiver(Datum arg)
637629MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State ,& msg -> dxid ,HASH_FIND ,NULL );
638630Assert (ts != NULL );
639631Assert (msg -> node > 0 && msg -> node <=nNodes && msg -> node != MtmNodeId );
632+
633+ Mtm -> nodes [msg -> node - 1 ].oldestSnapshot = msg -> oldestSnapshot ;
634+
640635if (MtmIsCoordinator (ts )) {
641636switch (msg -> code ) {
642637case MSG_READY :
643- Assert (ts -> nVotes < ds -> nNodes );
644- ds -> nodeTransDelay [msg -> node - 1 ]+= MtmGetCurrentTime ()- ts -> csn ;
638+ Assert (ts -> nVotes < Mtm -> nNodes );
639+ Mtm -> nodes [msg -> node - 1 ]. transDelay += MtmGetCurrentTime ()- ts -> csn ;
645640ts -> xids [msg -> node - 1 ]= msg -> sxid ;
646- if (++ ts -> nVotes == ds -> nNodes ) {
641+
642+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask )!= 0 ) {
643+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
644+ commit on smaller subset of nodes */
645+ ts -> status = TRANSACTION_STATUS_ABORTED ;
646+ MtmAdjustSubtransactions (ts );
647+ }
648+
649+ if (++ ts -> nVotes == Mtm -> nNodes ) {
647650/* All nodes are finished their transactions */
648651if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
649652ts -> nVotes = 1 ;/* I voted myself */
@@ -655,24 +658,24 @@ static void MtmTransReceiver(Datum arg)
655658}
656659break ;
657660case MSG_ABORTED :
658- Assert (ts -> nVotes < ds -> nNodes );
661+ Assert (ts -> nVotes < Mtm -> nNodes );
659662if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
660663Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
661664ts -> status = TRANSACTION_STATUS_ABORTED ;
662665MtmAdjustSubtransactions (ts );
663666}
664- if (++ ts -> nVotes == ds -> nNodes ) {
667+ if (++ ts -> nVotes == Mtm -> nNodes ) {
665668MtmWakeUpBackend (ts );
666669}
667670break ;
668671case MSG_PREPARED :
669672Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
670- Assert (ts -> nVotes < ds -> nNodes );
673+ Assert (ts -> nVotes < Mtm -> nNodes );
671674if (msg -> csn > ts -> csn ) {
672675ts -> csn = msg -> csn ;
673676MtmSyncClock (ts -> csn );
674677}
675- if (++ ts -> nVotes == ds -> nNodes ) {
678+ if (++ ts -> nVotes == Mtm -> nNodes ) {
676679ts -> csn = MtmAssignCSN ();
677680ts -> status = TRANSACTION_STATUS_UNKNOWN ;
678681MtmWakeUpBackend (ts );
@@ -703,7 +706,7 @@ static void MtmTransReceiver(Datum arg)
703706}
704707}
705708}
706- if (n == 0 && ds -> disabledNodeMask != 0 ) {
709+ if (n == 0 && Mtm -> disabledNodeMask != 0 ) {
707710/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
708711MtmRefreshClusterStatus (false);
709712}