5656#include "replication/slot.h"
5757#include "port/atomics.h"
5858#include "tcop/utility.h"
59+ #include "libpq/ip.h"
5960
6061#ifndef USE_EPOLL
6162#ifdef __linux__
@@ -139,31 +140,6 @@ void MtmArbiterInitialize(void)
139140RegisterBackgroundWorker (& MtmMonitorWorker );
140141}
141142
142- static int
143- MtmResolveHostByName (const char * hostname ,unsigned * addrs ,unsigned * n_addrs )
144- {
145- struct sockaddr_in sin ;
146- struct hostent * hp ;
147- unsigned i ;
148-
149- sin .sin_addr .s_addr = inet_addr (hostname );
150- if (sin .sin_addr .s_addr != INADDR_NONE ) {
151- memcpy (& addrs [0 ],& sin .sin_addr .s_addr ,sizeof (sin .sin_addr .s_addr ));
152- * n_addrs = 1 ;
153- return 1 ;
154- }
155-
156- hp = gethostbyname (hostname );
157- if (hp == NULL || hp -> h_addrtype != AF_INET ) {
158- return 0 ;
159- }
160- for (i = 0 ;hp -> h_addr_list [i ]!= NULL && i < * n_addrs ;i ++ ) {
161- memcpy (& addrs [i ],hp -> h_addr_list [i ],sizeof (addrs [i ]));
162- }
163- * n_addrs = i ;
164- return 1 ;
165- }
166-
167143static int stop = 0 ;
168144static void SetStop (int sig )
169145{
@@ -352,7 +328,6 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
352328
353329static void MtmScheduleHeartbeat ()
354330{
355- //Assert(!last_sent_heartbeat || last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
356331if (!stop ) {
357332enable_timeout_after (heartbeat_timer ,MtmHeartbeatSendTimeout );
358333send_heartbeat = true;
@@ -399,7 +374,6 @@ static void MtmSendHeartbeat()
399374close (sockets [i ]);
400375sockets [i ]= -1 ;
401376MtmReconnectNode (i + 1 );/* set reconnect mask to force node reconnent */
402- //MtmOnNodeConnect(i+1);
403377}
404378MTM_LOG4 ("Send heartbeat to node %d with timestamp %lld" ,i + 1 ,now );
405379}
@@ -426,23 +400,31 @@ void MtmCheckHeartbeat()
426400
427401static int MtmConnectSocket (int node ,int port ,time_t timeout )
428402{
429- struct sockaddr_in sock_inet ;
430- unsigned addrs [MAX_ROUTES ];
431- unsigned i ,n_addrs = sizeof (addrs ) /sizeof (addrs [0 ]);
403+ struct addrinfo * addrs = NULL ;
404+ struct addrinfo * addr ;
405+ struct addrinfo hint ;
406+ char portstr [MAXPGPATH ];
432407MtmHandshakeMessage req ;
433408MtmArbiterMessage resp ;
434409int sd ;
410+ int ret ;
435411timestamp_t start = MtmGetSystemTime ();
436412char const * host = Mtm -> nodes [node ].con .hostName ;
437413nodemask_t save_mask = busy_mask ;
438414timestamp_t afterWait ;
439415timestamp_t beforeWait ;
440416
441- sock_inet .sin_family = AF_INET ;
442- sock_inet .sin_port = htons (port );
417+ /* Initialize hint structure */
418+ MemSet (& hint ,0 ,sizeof (hint ));
419+ hint .ai_socktype = SOCK_STREAM ;
420+ hint .ai_family = AF_UNSPEC ;
421+
422+ snprintf (portstr ,sizeof (portstr ),"%d" ,port );
443423
444- if (!MtmResolveHostByName (host ,addrs ,& n_addrs )) {
445- MTM_ELOG (LOG ,"Arbiter failed to resolve host '%s' by name" ,host );
424+ ret = pg_getaddrinfo_all (host ,portstr ,& hint ,& addrs );
425+ if (ret != 0 )
426+ {
427+ MTM_ELOG (LOG ,"Arbiter failed to resolve host '%s' by name: %s" ,host ,gai_strerror (ret ));
446428return -1 ;
447429}
448430BIT_SET (busy_mask ,node );
@@ -459,13 +441,14 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
459441rc = fcntl (sd ,F_SETFL ,O_NONBLOCK );
460442if (rc < 0 ) {
461443MTM_ELOG (LOG ,"Arbiter failed to switch socket to non-blocking mode: %d" ,errno );
444+ close (sd );
462445busy_mask = save_mask ;
463446return -1 ;
464447}
465- for (i = 0 ; i < n_addrs ; ++ i ) {
466- memcpy ( & sock_inet . sin_addr , & addrs [ i ], sizeof sock_inet . sin_addr );
448+ for (addr = addrs ; addr != NULL ; addr = addr -> ai_next )
449+ {
467450do {
468- rc = connect (sd ,( struct sockaddr * ) & sock_inet , sizeof ( sock_inet ) );
451+ rc = connect (sd ,addr -> ai_addr , addr -> ai_addrlen );
469452}while (rc < 0 && errno == EINTR );
470453
471454if (rc >=0 || errno == EINPROGRESS ) {
@@ -638,6 +621,7 @@ static void MtmAcceptOneConnection()
638621rc = MtmReadSocket (fd ,& req ,sizeof req );
639622if (rc < sizeof (req )) {
640623MTM_ELOG (WARNING ,"Arbiter failed to handshake socket: %d, errno=%d" ,rc ,errno );
624+ close (fd );
641625}else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
642626MTM_ELOG (WARNING ,"Arbiter get unexpected handshake message %d" ,req .hdr .code );
643627close (fd );
@@ -693,7 +677,9 @@ static void MtmAcceptIncomingConnections()
693677if (gateway < 0 ) {
694678MTM_ELOG (ERROR ,"Arbiter failed to create socket: %d" ,errno );
695679}
696- setsockopt (gateway ,SOL_SOCKET ,SO_REUSEADDR , (char * )& on ,sizeof on );
680+ if (setsockopt (gateway ,SOL_SOCKET ,SO_REUSEADDR , (char * )& on ,sizeof on )< 0 ) {
681+ MTM_ELOG (ERROR ,"Arbiter failed to set options for socket: %d" ,errno );
682+ }
697683
698684if (bind (gateway , (struct sockaddr * )& sock_inet ,sizeof (sock_inet ))< 0 ) {
699685MTM_ELOG (ERROR ,"Arbiter failed to bind socket: %d" ,errno );
@@ -726,7 +712,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, MtmArbiterMessage* msg)
726712
727713static void MtmSender (Datum arg )
728714{
729- sigset_t sset ;
730715int nNodes = MtmMaxNodes ;
731716int i ;
732717
@@ -737,8 +722,6 @@ static void MtmSender(Datum arg)
737722signal (SIGINT ,SetStop );
738723signal (SIGQUIT ,SetStop );
739724signal (SIGTERM ,SetStop );
740- sigfillset (& sset );
741- sigprocmask (SIG_UNBLOCK ,& sset ,NULL );
742725
743726/* We're now ready to receive signals */
744727BackgroundWorkerUnblockSignals ();
@@ -815,13 +798,9 @@ static bool MtmRecovery()
815798
816799static void MtmMonitor (Datum arg )
817800{
818- sigset_t sset ;
819-
820801signal (SIGINT ,SetStop );
821802signal (SIGQUIT ,SetStop );
822803signal (SIGTERM ,SetStop );
823- sigfillset (& sset );
824- sigprocmask (SIG_UNBLOCK ,& sset ,NULL );
825804
826805/* We're now ready to receive signals */
827806BackgroundWorkerUnblockSignals ();
@@ -840,7 +819,6 @@ static void MtmMonitor(Datum arg)
840819
841820static void MtmReceiver (Datum arg )
842821{
843- sigset_t sset ;
844822int nNodes = MtmMaxNodes ;
845823int nResponses ;
846824int i ,j ,n ,rc ;
@@ -860,8 +838,6 @@ static void MtmReceiver(Datum arg)
860838signal (SIGINT ,SetStop );
861839signal (SIGQUIT ,SetStop );
862840signal (SIGTERM ,SetStop );
863- sigfillset (& sset );
864- sigprocmask (SIG_UNBLOCK ,& sset ,NULL );
865841
866842/* We're now ready to receive signals */
867843BackgroundWorkerUnblockSignals ();
@@ -1078,7 +1054,6 @@ static void MtmReceiver(Datum arg)
10781054}else if (MtmUseDtm ) {
10791055ts -> votedMask = 0 ;
10801056MTM_TXTRACE (ts ,"MtmTransReceiver send MSG_PRECOMMIT" );
1081- //MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10821057Assert (replorigin_session_origin == InvalidRepOriginId );
10831058MTM_LOG2 ("SetPreparedTransactionState for %s" ,ts -> gid );
10841059MtmUnlock ();
@@ -1130,7 +1105,7 @@ static void MtmReceiver(Datum arg)
11301105}else {
11311106Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
11321107MTM_ELOG (WARNING ,"Receive PRECOMMITTED response for aborted transaction %s (%llu) from node %d" ,
1133- ts -> gid , (long64 )ts -> xid ,node );// How it can happen? Should we use assert here?
1108+ ts -> gid , (long64 )ts -> xid ,node );
11341109if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
11351110MtmWakeUpBackend (ts );
11361111}
@@ -1140,23 +1115,7 @@ static void MtmReceiver(Datum arg)
11401115Assert (false);
11411116}
11421117}else {
1143- switch (msg -> code ) {
1144- case MSG_PRECOMMIT :
1145- Assert (false);// Now sent through pglogical
1146- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
1147- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1148- ts -> csn = MtmAssignCSN ();
1149- MtmAdjustSubtransactions (ts );
1150- MtmSend2PCMessage (ts ,MSG_PRECOMMITTED );
1151- }else if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
1152- MtmSend2PCMessage (ts ,MSG_ABORTED );
1153- }else {
1154- MTM_ELOG (WARNING ,"Transaction %s is already %s" ,ts -> gid ,MtmTxnStatusMnem [ts -> status ]);
1155- }
1156- break ;
1157- default :
1158- Assert (false);
1159- }
1118+ Assert (false);/* All broadcasts are now sent through pglogical */
11601119}
11611120}
11621121MtmUnlock ();