Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb042dae

Browse files
committed
Stop trying to reconnect in MtmConnectSocket() as heartbeat will try to reconnect anyway; Improve arbiter logging in case of network problems;
1 parentac81341 commitb042dae

File tree

3 files changed

+66
-110
lines changed

3 files changed

+66
-110
lines changed

‎arbiter.c

Lines changed: 65 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static void MtmSender(Datum arg);
9797
staticvoidMtmReceiver(Datumarg);
9898
staticvoidMtmMonitor(Datumarg);
9999
staticvoidMtmSendHeartbeat(void);
100-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout);
100+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
101101

102102
charconst*constMtmMessageKindMnem[]=
103103
{
@@ -166,7 +166,7 @@ static void MtmRegisterSocket(int fd, int node)
166166
ev.events=EPOLLIN;
167167
ev.data.u32=node;
168168
if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
169-
MTM_ELOG(LOG,"Arbiter failed to add socket to epoll set: %d",errno);
169+
MTM_ELOG(LOG,"Arbiter failed to add socket to epoll set: %s",strerror(errno));
170170
}
171171
#else
172172
FD_SET(fd,&inset);
@@ -180,7 +180,7 @@ static void MtmUnregisterSocket(int fd)
180180
{
181181
#ifUSE_EPOLL
182182
if (epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,NULL)<0) {
183-
MTM_ELOG(LOG,"Arbiter failed to unregister socket from epoll set: %d",errno);
183+
MTM_ELOG(LOG,"Arbiter failed to unregister socket from epoll set: %s",strerror(errno));
184184
}
185185
#else
186186
FD_CLR(fd,&inset);
@@ -371,7 +371,7 @@ static void MtmSendHeartbeat()
371371
|| !BIT_CHECK(Mtm->disabledNodeMask,i)
372372
||BIT_CHECK(Mtm->reconnectMask,i)))
373373
{
374-
if (!MtmSendToNode(i,&msg,sizeof(msg),MtmHeartbeatRecvTimeout)) {
374+
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
375375
MTM_ELOG(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
376376
}else {
377377
if (last_heartbeat_to_node[i]+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
@@ -408,7 +408,7 @@ void MtmCheckHeartbeat()
408408
}
409409

410410

411-
staticintMtmConnectSocket(intnode,intport,time_ttimeout)
411+
staticintMtmConnectSocket(intnode,intport)
412412
{
413413
structaddrinfo*addrs=NULL;
414414
structaddrinfo*addr;
@@ -417,12 +417,9 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
417417
MtmHandshakeMessagereq;
418418
MtmArbiterMessageresp;
419419
intsd=-1;
420-
intret;
421-
timestamp_tstart=MtmGetSystemTime();
420+
intrc;
422421
charconst*host=Mtm->nodes[node].con.hostName;
423422
nodemask_tsave_mask=busy_mask;
424-
timestamp_tafterWait;
425-
timestamp_tbeforeWait;
426423

427424
/* Initialize hint structure */
428425
MemSet(&hint,0,sizeof(hint));
@@ -431,67 +428,60 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
431428

432429
snprintf(portstr,sizeof(portstr),"%d",port);
433430

434-
ret=pg_getaddrinfo_all(host,portstr,&hint,&addrs);
435-
if (ret!=0)
431+
rc=pg_getaddrinfo_all(host,portstr,&hint,&addrs);
432+
if (rc!=0)
436433
{
437-
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name:(%d) %s",host,ret,gai_strerror(ret));
434+
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name:%s",host,gai_strerror(rc));
438435
return-1;
439436
}
440437
BIT_SET(busy_mask,node);
441438

442-
Retry:
443-
while (1) {
444-
intrc=-1;
445-
sd=pg_socket(AF_INET,SOCK_STREAM,0,MtmUseRDMA);
446-
if (sd<0) {
447-
MTM_ELOG(LOG,"Arbiter failed to create socket: %d",errno);
448-
gotoError;
449-
}
450-
rc=pg_fcntl(sd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
451-
if (rc<0) {
452-
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
453-
gotoError;
454-
}
455-
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
456-
{
457-
do {
458-
rc=pg_connect(sd,addr->ai_addr,addr->ai_addrlen,MtmUseRDMA);
459-
}while (rc<0&&errno==EINTR);
439+
Retry:
460440

461-
if (rc >=0||errno==EINPROGRESS) {
462-
break;
463-
}
464-
}
465-
if (rc==0) {
441+
sd=socket(AF_INET,SOCK_STREAM,0);
442+
if (sd<0) {
443+
MTM_ELOG(LOG,"Arbiter failed to create socket: %s",strerror(errno));
444+
gotoError;
445+
}
446+
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
447+
if (rc<0) {
448+
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
449+
gotoError;
450+
}
451+
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
452+
{
453+
do {
454+
rc=connect(sd,addr->ai_addr,addr->ai_addrlen);
455+
}while (rc<0&&errno==EINTR);
456+
457+
if (rc >=0||errno==EINPROGRESS) {
466458
break;
467459
}
468-
beforeWait=MtmGetSystemTime();
469-
if (errno!=EINPROGRESS||start+MSEC_TO_USEC(timeout)<beforeWait ) {
470-
MTM_ELOG(WARNING,"Arbiter failed to connect to %s:%d: error=%d",host,port,errno);
471-
gotoError;
472-
}else {
473-
rc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
474-
if (rc==1) {
475-
socklen_toptlen=sizeof(int);
476-
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&rc,&optlen)<0) {
477-
MTM_ELOG(WARNING,"Arbiter failed to getsockopt for %s:%d: error=%d",host,port,errno);
478-
gotoError;
479-
}
480-
if (rc==0) {
481-
break;
482-
}else {
483-
MTM_ELOG(WARNING,"Arbiter trying to connect to %s:%d: rc=%d, error=%d",host,port,rc,errno);
484-
}
485-
}else {
486-
MTM_ELOG(WARNING,"Arbiter waiting socket to %s:%d: rc=%d, error=%d",host,port,rc,errno);
460+
}
461+
462+
if (rc!=0&&errno==EINPROGRESS) {
463+
rc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
464+
if (rc==1) {
465+
socklen_toptlen=sizeof(int);
466+
interrcode;
467+
468+
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&errcode,&optlen)<0) {
469+
MTM_ELOG(WARNING,"Arbiter failed to getsockopt for %s:%d: %s",host,port,strerror(errcode));
470+
gotoError;
487471
}
488-
pg_closesocket(sd,MtmUseRDMA);
489-
afterWait=MtmGetSystemTime();
490-
if (afterWait<beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
491-
MtmSleep(beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)-afterWait);
472+
if (errcode!=0) {
473+
MTM_ELOG(WARNING,"Arbiter trying to connect to %s:%d: %s",host,port,strerror(errcode));
474+
gotoError;
492475
}
476+
}else {
477+
MTM_ELOG(WARNING,"Arbiter waiting socket to %s:%d: %s",host,port,strerror(errno));
493478
}
494479
}
480+
elseif (rc!=0) {
481+
MTM_ELOG(WARNING,"Arbiter failed to connect to %s:%d: (%d) %s",host,port,rc,strerror(errno));
482+
gotoError;
483+
}
484+
495485
MtmSetSocketOptions(sd);
496486
MtmInitMessage(&req.hdr,MSG_HANDSHAKE);
497487
req.hdr.node=MtmNodeId;
@@ -500,13 +490,13 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
500490
req.hdr.csn=MtmGetCurrentTime();
501491
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
502492
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
503-
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
504-
pg_closesocket(sd,MtmUseRDMA);
493+
MTM_ELOG(WARNING,"Arbiter failed to send handshake message to %s:%d: %s",host,port,strerror(errno));
494+
close(sd);
505495
gotoRetry;
506496
}
507497
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
508-
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d:errno=%d",host,port,errno);
509-
pg_closesocket(sd,MtmUseRDMA);
498+
MTM_ELOG(WARNING,"Arbiter failed to receive response for handshake message from %s:%d:%s",host,port,strerror(errno));
499+
close(sd);
510500
gotoRetry;
511501
}
512502
if (resp.code!=MSG_STATUS||resp.dxid!=HANDSHAKE_MAGIC) {
@@ -527,7 +517,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
527517

528518
returnsd;
529519

530-
Error:
520+
Error:
531521
busy_mask=save_mask;
532522
if (sd >=0) {
533523
pg_closesocket(sd,MtmUseRDMA);
@@ -551,7 +541,7 @@ static void MtmOpenConnections()
551541
}
552542
for (i=0;i<nNodes;i++) {
553543
if (i+1!=MtmNodeId&&i<Mtm->nAllNodes) {
554-
sockets[i]=MtmConnectSocket(i,Mtm->nodes[i].con.arbiterPort,MtmConnectTimeout);
544+
sockets[i]=MtmConnectSocket(i,Mtm->nodes[i].con.arbiterPort);
555545
if (sockets[i]<0) {
556546
MtmOnNodeDisconnect(i+1);
557547
}
@@ -566,7 +556,7 @@ static void MtmOpenConnections()
566556
}
567557

568558

569-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout)
559+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
570560
{
571561
boolresult= true;
572562
nodemask_tsave_mask=busy_mask;
@@ -589,11 +579,11 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
589579
}
590580
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
591581
if (sockets[node] >=0) {
592-
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %d",node+1,errno);
593-
pg_closesocket(sockets[node],MtmUseRDMA);
582+
MTM_ELOG(WARNING,"Arbiter fail to write to node %d: %s",node+1,strerror(errno));
583+
close(sockets[node]);
594584
sockets[node]=-1;
595585
}
596-
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort,reconnectTimeout);
586+
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort);
597587
if (sockets[node]<0) {
598588
MtmOnNodeDisconnect(node+1);
599589
result= false;
@@ -613,7 +603,7 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
613603
{
614604
intrc=MtmReadSocket(sockets[node],buf,buf_size);
615605
if (rc <=0) {
616-
MTM_ELOG(WARNING,"Arbiter failed to read from node=%d, rc=%d, errno=%d",node+1,rc,errno);
606+
MTM_ELOG(WARNING,"Arbiter failed to read from node=%d: %s",node+1,strerror(errno));
617607
MtmDisconnect(node);
618608
}
619609
returnrc;
@@ -623,17 +613,17 @@ static void MtmAcceptOneConnection()
623613
{
624614
intfd=pg_accept(gateway,NULL,NULL,MtmUseRDMA);
625615
if (fd<0) {
626-
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %d",errno);
616+
MTM_ELOG(WARNING,"Arbiter failed to accept socket: %s",strerror(errno));
627617
}else {
628618
MtmHandshakeMessagereq;
629619
MtmArbiterMessageresp;
630620
intrc=pg_fcntl(fd,F_SETFL,O_NONBLOCK,MtmUseRDMA);
631621
if (rc<0) {
632-
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
622+
MTM_ELOG(ERROR,"Arbiter failed to switch socket to non-blocking mode: %s",strerror(errno));
633623
}
634624
rc=MtmReadSocket(fd,&req,sizeofreq);
635625
if (rc<sizeof(req)) {
636-
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
626+
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %s",strerror(errno));
637627
pg_closesocket(fd,MtmUseRDMA);
638628
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
639629
MTM_ELOG(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
@@ -770,7 +760,7 @@ static void MtmSender(Datum arg)
770760

771761
for (i=0;i<Mtm->nAllNodes;i++) {
772762
if (txBuffer[i].used!=0) {
773-
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage),MtmReconnectTimeout);
763+
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage));
774764
txBuffer[i].used=0;
775765
}
776766
}
@@ -870,7 +860,7 @@ static void MtmReceiver(Datum arg)
870860
if (errno==EINTR) {
871861
continue;
872862
}
873-
MTM_ELOG(ERROR,"Arbiter failed to poll sockets: %d",errno);
863+
MTM_ELOG(ERROR,"Arbiter failed to poll sockets: %s",strerror(errno));
874864
}
875865
for (j=0;j<n;j++) {
876866
i=events[j].data.u32;
@@ -894,7 +884,7 @@ static void MtmReceiver(Datum arg)
894884
}while (n<0&&MtmRecovery());
895885

896886
if (n<0) {
897-
MTM_ELOG(ERROR,"Arbiter failed to select sockets: %d",errno);
887+
MTM_ELOG(ERROR,"Arbiter failed to select sockets: %s",strerror(errno));
898888
}
899889
for (i=0;i<nNodes;i++) {
900890
if (sockets[i] >=0&&FD_ISSET(sockets[i],&events))

‎multimaster.c

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ char const* const MtmNodeStatusMnem[] =
204204
{
205205
"Initialization",
206206
"Offline",
207-
"Connected",
207+
"Connecting",
208208
"Online",
209209
"Recovery",
210210
"Recovered",
@@ -229,8 +229,6 @@ int MtmNodes;
229229
intMtmNodeId;
230230
intMtmReplicationNodeId;
231231
intMtmArbiterPort;
232-
intMtmConnectTimeout;
233-
intMtmReconnectTimeout;
234232
intMtmNodeDisableDelay;
235233
intMtmTransSpillThreshold;
236234
intMtmMaxNodes;
@@ -3292,36 +3290,6 @@ _PG_init(void)
32923290
NULL
32933291
);
32943292

3295-
DefineCustomIntVariable(
3296-
"multimaster.connect_timeout",
3297-
"Multimaster nodes connect timeout",
3298-
"Interval in milliseconds for establishing connection with cluster node",
3299-
&MtmConnectTimeout,
3300-
10000,/* 10 seconds */
3301-
1,
3302-
INT_MAX,
3303-
PGC_BACKEND,
3304-
0,
3305-
NULL,
3306-
NULL,
3307-
NULL
3308-
);
3309-
3310-
DefineCustomIntVariable(
3311-
"multimaster.reconnect_timeout",
3312-
"Multimaster nodes reconnect timeout",
3313-
"Interval in milliseconds for reestablishing connection with cluster node",
3314-
&MtmReconnectTimeout,
3315-
5000,/* 5 seconds */
3316-
1,
3317-
INT_MAX,
3318-
PGC_BACKEND,
3319-
0,
3320-
NULL,
3321-
NULL,
3322-
NULL
3323-
);
3324-
33253293
if (!ConfigIsSane()) {
33263294
MTM_ELOG(ERROR,"Multimaster config is insane, refusing to work");
33273295
}

‎multimaster.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,6 @@ extern int MtmNodes;
343343
externintMtmArbiterPort;
344344
externchar*MtmDatabaseName;
345345
externchar*MtmDatabaseUser;
346-
externintMtmConnectTimeout;
347-
externintMtmReconnectTimeout;
348346
externintMtmNodeDisableDelay;
349347
externintMtmTransSpillThreshold;
350348
externintMtmHeartbeatSendTimeout;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp