Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5e6e9fc

Browse files
knizhnikkelvich
authored andcommitted
Fix sending hearbeats
1 parenta0fd98b commit5e6e9fc

File tree

4 files changed

+42
-21
lines changed

4 files changed

+42
-21
lines changed

‎arbiter.c

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ static int gateway;
107107
staticboolsend_heartbeat;
108108
statictimestamp_tlast_sent_heartbeat;
109109
staticTimeoutIdheartbeat_timer;
110-
staticintbusy_socket;
110+
staticnodemask_tbusy_mask;
111111

112112
staticvoidMtmTransSender(Datumarg);
113113
staticvoidMtmTransReceiver(Datumarg);
@@ -243,23 +243,19 @@ static int MtmWaitSocket(int sd, bool forWrite, time_t timeoutMsec)
243243
staticboolMtmWriteSocket(intsd,voidconst*buf,intsize)
244244
{
245245
char*src= (char*)buf;
246-
busy_socket=sd;
247246
while (size!=0) {
248247
intrc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
249248
if (rc==1) {
250249
while ((rc=send(sd,src,size,0))<0&&errno==EINTR);
251250
if (rc<0) {
252-
busy_socket=-1;
253251
return false;
254252
}
255253
size-=rc;
256254
src+=rc;
257255
}elseif (rc<0) {
258-
busy_socket=-1;
259256
return false;
260257
}
261258
}
262-
busy_socket=-1;
263259
return true;
264260
}
265261

@@ -271,8 +267,6 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
271267
rc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
272268
if (rc==1) {
273269
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
274-
}else {
275-
return0;
276270
}
277271
}
278272
returnrc;
@@ -365,7 +359,7 @@ static void MtmSendHeartbeat()
365359

366360
for (i=0;i<Mtm->nAllNodes;i++)
367361
{
368-
if (i+1!=MtmNodeId&&sockets[i]!=busy_socket
362+
if (i+1!=MtmNodeId&&!BIT_CHECK(busy_mask,i)
369363
&& (Mtm->status!=MTM_ONLINE
370364
|| (sockets[i] >=0&& !BIT_CHECK(Mtm->disabledNodeMask,i)&& !BIT_CHECK(Mtm->reconnectMask,i))))
371365
{
@@ -399,6 +393,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
399393
intsd;
400394
timestamp_tstart=MtmGetSystemTime();
401395
charconst*host=Mtm->nodes[node].con.hostName;
396+
nodemask_tsave_mask=busy_mask;
402397

403398
sock_inet.sin_family=AF_INET;
404399
sock_inet.sin_port=htons(port);
@@ -407,22 +402,24 @@ static int MtmConnectSocket(int node, int port, int timeout)
407402
elog(LOG,"Arbiter failed to resolve host '%s' by name",host);
408403
return-1;
409404
}
410-
405+
BIT_SET(busy_mask,node);
406+
411407
Retry:
412408
while (1) {
413409
intrc=-1;
414410

415411
sd=socket(AF_INET,SOCK_STREAM,0);
416412
if (sd<0) {
417413
elog(LOG,"Arbiter failed to create socket: %d",errno);
414+
busy_mask=save_mask;
418415
return-1;
419416
}
420417
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
421418
if (rc<0) {
422419
elog(LOG,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
420+
busy_mask=save_mask;
423421
return-1;
424422
}
425-
busy_socket=sd;
426423
for (i=0;i<n_addrs;++i) {
427424
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
428425
do {
@@ -438,17 +435,17 @@ static int MtmConnectSocket(int node, int port, int timeout)
438435
}
439436
if (errno!=EINPROGRESS||start+MSEC_TO_USEC(timeout)<MtmGetSystemTime()) {
440437
elog(WARNING,"Arbiter failed to connect to %s:%d: error=%d",host,port,errno);
441-
busy_socket=-1;
442438
close(sd);
439+
busy_mask=save_mask;
443440
return-1;
444441
}else {
445442
rc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
446443
if (rc==1) {
447444
socklen_toptlen=sizeof(int);
448445
if (getsockopt(sd,SOL_SOCKET,SO_ERROR, (void*)&rc,&optlen)<0) {
449446
elog(WARNING,"Arbiter failed to getsockopt for %s:%d: error=%d",host,port,errno);
450-
busy_socket=-1;
451447
close(sd);
448+
busy_mask=save_mask;
452449
return-1;
453450
}
454451
if (rc==0) {
@@ -491,6 +488,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
491488
MtmCheckResponse(&resp);
492489
MtmUnlock();
493490

491+
busy_mask=save_mask;
492+
494493
returnsd;
495494
}
496495

@@ -533,6 +532,9 @@ static void MtmOpenConnections()
533532

534533
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
535534
{
535+
boolresult= true;
536+
nodemask_tsave_mask=busy_mask;
537+
BIT_SET(busy_mask,node);
536538
while (true) {
537539
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
538540
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
@@ -553,13 +555,17 @@ static bool MtmSendToNode(int node, void const* buf, int size)
553555
sockets[node]=MtmConnectSocket(node,MtmArbiterPort+node+1,MtmReconnectTimeout);
554556
if (sockets[node]<0) {
555557
MtmOnNodeDisconnect(node+1);
556-
return false;
558+
result= false;
559+
break;
557560
}
558561
MTM_LOG3("Arbiter restablished connection with node %d",node+1);
559562
}else {
560-
return true;
563+
result= true;
564+
break;
561565
}
562566
}
567+
busy_mask=save_mask;
568+
returnresult;
563569
}
564570

565571
staticintMtmReadFromNode(intnode,void*buf,intbuf_size)

‎multimaster.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,11 @@ MtmAdjustOldestXid(TransactionId xid)
534534
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
535535
}
536536
}
537-
oldestSnapshot-=MtmVacuumDelay*USECS_PER_SEC;
537+
if (oldestSnapshot>MtmVacuumDelay*USECS_PER_SEC) {
538+
oldestSnapshot-=MtmVacuumDelay*USECS_PER_SEC;
539+
}else {
540+
oldestSnapshot=0;
541+
}
538542

539543
for (ts=Mtm->transListHead;
540544
ts!=NULL
@@ -1570,7 +1574,16 @@ void MtmOnNodeDisconnect(int nodeId)
15701574
BIT_SET(Mtm->reconnectMask,nodeId-1);
15711575
MtmUnlock();
15721576

1573-
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
1577+
if (!RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false))
1578+
{
1579+
elog(WARNING,"Disable node which is in minority according to RAFT");
1580+
MtmLock(LW_EXCLUSIVE);
1581+
if (Mtm->status==MTM_ONLINE) {
1582+
MtmSwitchClusterMode(MTM_IN_MINORITY);
1583+
}
1584+
MtmUnlock();
1585+
return;
1586+
}
15741587

15751588
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
15761589

‎raftable.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
2222
}
2323

2424

25-
voidRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait)
25+
boolRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait)
2626
{
2727
if (MtmUseRaftable) {
28-
inttries=10;
28+
inttries=MtmHeartbeatRecvTimeout/MtmHeartbeatSendTimeout;
2929
timestamp_tstart,stop;
3030
start=MtmGetSystemTime();
3131
if (nowait) {
@@ -36,8 +36,8 @@ void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
3636
MtmCheckHeartbeat();
3737
if (tries-- <=0)
3838
{
39-
MTM_LOG1("RaftableSet nowait=%d, all attempts failed",nowait);
40-
break;
39+
elog(WARNING,"Failed to send data to raftable in %d msec",MtmHeartbeatRecvTimeout);
40+
return false;
4141
}
4242
}
4343
}
@@ -46,6 +46,7 @@ void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
4646
MTM_LOG1("Raftable set nowait=%d takes %ld microseconds",nowait,stop-start);
4747
}
4848
}
49+
return true;
4950
}
5051

5152
boolRaftableCAS(charconst*key,charconst*value,boolnowait)

‎raftable_wrapper.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ extern void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, b
2020
/*
2121
* Set new value for the specified key. IF value is NULL, then key should be deleted.
2222
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
23+
* Returns false if rafttable is in minority
2324
*/
24-
externvoidRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait);
25+
externboolRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait);
2526

2627
/*
2728
* If key doesn't exists or its value is not equal to the specified value then store this value and return true.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp