Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit558a943

Browse files
knizhnikkelvich
authored andcommitted
Non blocking connect in arbiter
1 parentb362cad commit558a943

File tree

2 files changed

+103
-59
lines changed

2 files changed

+103
-59
lines changed

‎arbiter.c

Lines changed: 97 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,13 @@ static int* sockets;
104104
staticintgateway;
105105
staticboolsend_heartbeat;
106106
staticTimeoutIdheartbeat_timer;
107+
staticintbusy_socket;
107108

108109
staticvoidMtmTransSender(Datumarg);
109110
staticvoidMtmTransReceiver(Datumarg);
111+
staticvoidMtmSendHeartbeat(void);
112+
staticvoidMtmCheckHeartbeat(void);
113+
110114

111115

112116
staticcharconst*constmessageText[]=
@@ -218,17 +222,41 @@ static void MtmDisconnect(int node)
218222
MtmOnNodeDisconnect(node+1);
219223
}
220224

225+
staticintMtmWaitWriteSocket(intsd,time_ttimeoutMsec)
226+
{
227+
structtimevaltv;
228+
fd_setout_set;
229+
intrc;
230+
tv.tv_sec=timeoutMsec/1000;
231+
tv.tv_usec=timeoutMsec%1000*1000;
232+
FD_ZERO(&out_set);
233+
FD_SET(sd,&out_set);
234+
do {
235+
MtmCheckHeartbeat();
236+
}while ((rc=select(sd+1,NULL,&out_set,NULL,&tv))<0&&errno==EINTR);
237+
returnrc;
238+
}
239+
221240
staticboolMtmWriteSocket(intsd,voidconst*buf,intsize)
222241
{
223242
char*src= (char*)buf;
243+
busy_socket=sd;
224244
while (size!=0) {
225-
intn=send(sd,src,size,0);
226-
if (n <=0) {
245+
intrc=MtmWaitWriteSocket(sd,MtmHeartbeatSendTimeout);
246+
if (rc==1) {
247+
intn=send(sd,src,size,0);
248+
if (n<0) {
249+
busy_socket=-1;
250+
return false;
251+
}
252+
size-=n;
253+
src+=n;
254+
}elseif (rc<0) {
255+
busy_socket=-1;
227256
return false;
228-
}
229-
size-=n;
230-
src+=n;
257+
}
231258
}
259+
busy_socket=-1;
232260
return true;
233261
}
234262

@@ -311,9 +339,10 @@ static void MtmSendHeartbeat()
311339

312340
for (i=0;i<Mtm->nAllNodes;i++)
313341
{
314-
if (sockets[i] >=0&& !BIT_CHECK(Mtm->disabledNodeMask|Mtm->reconnectMask,i))
342+
if (sockets[i] >=0&&sockets[i]!=busy_socket&&!BIT_CHECK(Mtm->disabledNodeMask|Mtm->reconnectMask,i))
315343
{
316-
MtmWriteSocket(sockets[i],&msg,sizeof(msg));
344+
intrc=send(sockets[i],&msg,sizeof(msg),0);
345+
Assert(rc <=0|| (size_t)rc==sizeof(msg));
317346
}
318347
}
319348

@@ -327,13 +356,15 @@ static void MtmCheckHeartbeat()
327356
MtmSendHeartbeat();
328357
}
329358
}
330-
359+
331360

332361
staticintMtmConnectSocket(charconst*host,intport,intmax_attempts)
333362
{
334363
structsockaddr_insock_inet;
335364
unsignedaddrs[MAX_ROUTES];
336365
unsignedi,n_addrs=sizeof(addrs) /sizeof(addrs[0]);
366+
MtmHandshakeMessagereq;
367+
MtmArbiterMessageresp;
337368
intsd;
338369

339370
sock_inet.sin_family=AF_INET;
@@ -347,67 +378,80 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
347378
while (1) {
348379
intrc=-1;
349380

350-
sd=socket(AF_INET,SOCK_STREAM,0);
381+
sd=socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,0);
351382
if (sd<0) {
352383
elog(ERROR,"Arbiter failed to create socket: %d",errno);
353384
}
385+
busy_socket=sd;
354386
for (i=0;i<n_addrs;++i) {
355387
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
356388
do {
357389
rc=connect(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet));
358-
MtmCheckHeartbeat();
359390
}while (rc<0&&errno==EINTR);
360391

361392
if (rc >=0||errno==EINPROGRESS) {
362393
break;
363394
}
364395
}
365-
if (rc<0) {
366-
if ((errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS)||max_attempts==0) {
367-
elog(WARNING,"Arbiter failed to connect to %s:%d: error=%d",host,port,errno);
368-
return-1;
369-
}else {
370-
max_attempts-=1;
371-
elog(WARNING,"Arbiter trying to connect to %s:%d: error=%d",host,port,errno);
372-
MtmSleep(MtmConnectTimeout);
373-
}
374-
continue;
396+
if (rc==0) {
397+
break;
398+
}
399+
if (errno!=EINPROGRESS||max_attempts==0) {
400+
elog(WARNING,"Arbiter failed to connect to %s:%d: error=%d",host,port,errno);
401+
busy_socket=-1;
402+
return-1;
375403
}else {
376-
MtmHandshakeMessagereq;
377-
MtmArbiterMessageresp;
378-
MtmSetSocketOptions(sd);
379-
req.hdr.code=MSG_HANDSHAKE;
380-
req.hdr.node=MtmNodeId;
381-
req.hdr.dxid=HANDSHAKE_MAGIC;
382-
req.hdr.sxid=ShmemVariableCache->nextXid;
383-
req.hdr.csn=MtmGetCurrentTime();
384-
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
385-
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
386-
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
387-
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
388-
close(sd);
389-
gotoRetry;
390-
}
391-
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
392-
elog(WARNING,"Arbiter failed to receive response for handshake message from %s:%d: errno=%d",host,port,errno);
393-
close(sd);
394-
gotoRetry;
395-
}
396-
if (resp.code!=MSG_STATUS||resp.dxid!=HANDSHAKE_MAGIC) {
397-
elog(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d",resp.code,host,port);
398-
close(sd);
399-
gotoRetry;
400-
}
401-
402-
/* Some node considered that I am dead, so switch to recovery mode */
403-
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
404-
elog(WARNING,"Node %d thinks that I was dead",resp.node);
405-
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
406-
MtmSwitchClusterMode(MTM_RECOVERY);
404+
rc=MtmWaitWriteSocket(sd,MtmConnectTimeout);
405+
if (rc==1) {
406+
socklen_toptlen=sizeof(int);
407+
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&rc,&optlen)<0) {
408+
elog(WARNING,"Arbiter failed to getsockopt for %s:%d: error=%d",host,port,errno);
409+
busy_socket=-1;
410+
return-1;
411+
}
412+
if (rc==0) {
413+
break;
414+
}else {
415+
elog(WARNING,"Arbiter trying to connect to %s:%d: rc=%d, error=%d",host,port,rc,errno);
416+
}
417+
}else {
418+
elog(WARNING,"Arbiter waiting socket to %s:%d: rc=%d, error=%d",host,port,rc,errno);
407419
}
408-
returnsd;
420+
max_attempts-=1;
421+
MtmSleep(MSEC_TO_USEC(MtmConnectTimeout));
409422
}
410-
}
423+
}
424+
MtmSetSocketOptions(sd);
425+
req.hdr.code=MSG_HANDSHAKE;
426+
req.hdr.node=MtmNodeId;
427+
req.hdr.dxid=HANDSHAKE_MAGIC;
428+
req.hdr.sxid=ShmemVariableCache->nextXid;
429+
req.hdr.csn=MtmGetCurrentTime();
430+
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
431+
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
432+
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
433+
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
434+
close(sd);
435+
gotoRetry;
436+
}
437+
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
438+
elog(WARNING,"Arbiter failed to receive response for handshake message from %s:%d: errno=%d",host,port,errno);
439+
close(sd);
440+
gotoRetry;
441+
}
442+
if (resp.code!=MSG_STATUS||resp.dxid!=HANDSHAKE_MAGIC) {
443+
elog(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d",resp.code,host,port);
444+
close(sd);
445+
gotoRetry;
446+
}
447+
448+
/* Some node considered that I am dead, so switch to recovery mode */
449+
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
450+
elog(WARNING,"Node %d thinks that I was dead",resp.node);
451+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
452+
MtmSwitchClusterMode(MTM_RECOVERY);
453+
}
454+
returnsd;
411455
}
412456

413457

‎multimaster.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,7 +1417,7 @@ void MtmOnNodeDisconnect(int nodeId)
14171417
BIT_SET(Mtm->reconnectMask,nodeId-1);
14181418
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
14191419

1420-
MtmSleep(MtmRaftPollDelay);
1420+
MtmSleep(MSEC_TO_USEC(MtmRaftPollDelay));
14211421

14221422
if (!MtmRefreshClusterStatus(false)) {
14231423
MtmLock(LW_EXCLUSIVE);
@@ -1800,7 +1800,7 @@ _PG_init(void)
18001800
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
18011801
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
18021802
&Mtm2PCMinTimeout,
1803-
10000,/*100 seconds */
1803+
10000,/*10 seconds */
18041804
0,
18051805
INT_MAX,
18061806
PGC_BACKEND,
@@ -2004,9 +2004,9 @@ _PG_init(void)
20042004
DefineCustomIntVariable(
20052005
"multimaster.connect_timeout",
20062006
"Multimaster nodes connect timeout",
2007-
"Interval inmicroseconds between connection attempts",
2007+
"Interval inmilliseconds between connection attempts",
20082008
&MtmConnectTimeout,
2009-
1000000,
2009+
1000,
20102010
1,
20112011
INT_MAX,
20122012
PGC_BACKEND,
@@ -2019,9 +2019,9 @@ _PG_init(void)
20192019
DefineCustomIntVariable(
20202020
"multimaster.raft_poll_delay",
20212021
"Multimaster delay of polling cluster state from Raftable after updating local node status",
2022-
"Timeout inmicroseconds before polling state of nodes",
2022+
"Timeout inmilliseconds before polling state of nodes",
20232023
&MtmRaftPollDelay,
2024-
1000000,
2024+
1000,
20252025
1,
20262026
INT_MAX,
20272027
PGC_BACKEND,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp