Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit37a9a6e

Browse files
knizhnikkelvich
authored andcommitted
Send hearbeat during reconnect
1 parent79cc60b commit37a9a6e

File tree

2 files changed

+55
-33
lines changed

2 files changed

+55
-33
lines changed

‎arbiter.c

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,43 @@ static void MtmSetSocketOptions(int sd)
292292
#endif
293293
}
294294

295+
296+
297+
staticvoidMtmScheduleHeartbeat()
298+
{
299+
send_heartbeat= true;
300+
PGSemaphoreUnlock(&Mtm->votingSemaphore);
301+
}
302+
303+
staticvoidMtmSendHeartbeat()
304+
{
305+
inti;
306+
MtmArbiterMessagemsg;
307+
msg.code=MSG_HEARTBEAT;
308+
msg.disabledNodeMask=Mtm->disabledNodeMask;
309+
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
310+
msg.node=MtmNodeId;
311+
312+
for (i=0;i<Mtm->nAllNodes;i++)
313+
{
314+
if (sockets[i] >=0&& !BIT_CHECK(Mtm->disabledNodeMask|Mtm->reconnectMask,i))
315+
{
316+
MtmWriteSocket(sockets[i],&msg,sizeof(msg));
317+
}
318+
}
319+
320+
}
321+
322+
staticvoidMtmCheckHeartbeat()
323+
{
324+
if (send_heartbeat) {
325+
send_heartbeat= false;
326+
enable_timeout_after(heartbeat_timer,MtmHeartbeatSendTimeout);
327+
MtmSendHeartbeat();
328+
}
329+
}
330+
331+
295332
staticintMtmConnectSocket(charconst*host,intport,intmax_attempts)
296333
{
297334
structsockaddr_insock_inet;
@@ -318,6 +355,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
318355
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
319356
do {
320357
rc=connect(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet));
358+
MtmCheckHeartbeat();
321359
}while (rc<0&&errno==EINTR);
322360

323361
if (rc >=0||errno==EINPROGRESS) {
@@ -331,7 +369,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
331369
}else {
332370
max_attempts-=1;
333371
elog(WARNING,"Arbiter trying to connect to %s:%d: error=%d",host,port,errno);
334-
MtmSleep(5*MtmConnectTimeout);
372+
MtmSleep(MtmConnectTimeout);
335373
}
336374
continue;
337375
}else {
@@ -380,14 +418,15 @@ static void MtmOpenConnections()
380418

381419
sockets= (int*)palloc(sizeof(int)*nNodes);
382420

421+
for (i=0;i<nNodes;i++) {
422+
sockets[i]=-1;
423+
}
383424
for (i=0;i<nNodes;i++) {
384425
if (i+1!=MtmNodeId&&i<Mtm->nAllNodes) {
385426
sockets[i]=MtmConnectSocket(Mtm->nodes[i].con.hostName,MtmArbiterPort+i+1,MtmConnectAttempts);
386427
if (sockets[i]<0) {
387428
MtmOnNodeDisconnect(i+1);
388429
}
389-
}else {
390-
sockets[i]=-1;
391430
}
392431
}
393432
if (Mtm->nLiveNodes<Mtm->nAllNodes/2+1) {/* no quorum */
@@ -412,6 +451,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
412451
if (sockets[node] >=0) {
413452
elog(WARNING,"Arbiter failed to write to node %d: %d",node+1,errno);
414453
close(sockets[node]);
454+
sockets[node]=-1;
415455
}
416456
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
417457
if (sockets[node]<0) {
@@ -518,17 +558,12 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
518558
}
519559
buf->data[buf->used].dxid=xid;
520560

521-
if (ts!=NULL) {
522-
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
523-
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
524-
Assert(ts->cmd!=MSG_INVALID);
525-
buf->data[buf->used].code=ts->cmd;
526-
buf->data[buf->used].sxid=ts->xid;
527-
buf->data[buf->used].csn=ts->csn;
528-
}else {
529-
buf->data[buf->used].code=MSG_HEARTBEAT;
530-
MTM_LOG3("Send HEARTBEAT to node %d from node %d at %ld\n",node+1,MtmNodeId,USEC_TO_MSEC(MtmGetSystemTime()));
531-
}
561+
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
562+
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
563+
Assert(ts->cmd!=MSG_INVALID);
564+
buf->data[buf->used].code=ts->cmd;
565+
buf->data[buf->used].sxid=ts->xid;
566+
buf->data[buf->used].csn=ts->csn;
532567
buf->data[buf->used].node=MtmNodeId;
533568
buf->data[buf->used].disabledNodeMask=Mtm->disabledNodeMask;
534569
buf->data[buf->used].oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
@@ -541,24 +576,15 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
541576
intn=1;
542577
for (i=0;i<Mtm->nAllNodes;i++)
543578
{
544-
if (i+1!=MtmNodeId&& !BIT_CHECK(Mtm->disabledNodeMask,i)
545-
&& (ts==NULL||TransactionIdIsValid(ts->xids[i])))
579+
if (i+1!=MtmNodeId&& !BIT_CHECK(Mtm->disabledNodeMask,i)&&TransactionIdIsValid(ts->xids[i]))
546580
{
547-
MtmAppendBuffer(txBuffer,ts ?ts->xids[i] :InvalidTransactionId,i,ts);
581+
MtmAppendBuffer(txBuffer,ts->xids[i],i,ts);
548582
n+=1;
549583
}
550584
}
551585
Assert(n==Mtm->nLiveNodes);
552586
}
553587

554-
staticvoidMtmSendHeartbeat()
555-
{
556-
send_heartbeat= true;
557-
PGSemaphoreUnlock(&Mtm->votingSemaphore);
558-
//enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
559-
}
560-
561-
562588
staticvoidMtmTransSender(Datumarg)
563589
{
564590
sigset_tsset;
@@ -575,7 +601,7 @@ static void MtmTransSender(Datum arg)
575601
sigfillset(&sset);
576602
sigprocmask(SIG_UNBLOCK,&sset,NULL);
577603

578-
heartbeat_timer=RegisterTimeout(USER_TIMEOUT,MtmSendHeartbeat);
604+
heartbeat_timer=RegisterTimeout(USER_TIMEOUT,MtmScheduleHeartbeat);
579605
enable_timeout_after(heartbeat_timer,MtmHeartbeatSendTimeout);
580606

581607
MtmOpenConnections();
@@ -589,11 +615,7 @@ static void MtmTransSender(Datum arg)
589615
PGSemaphoreLock(&Mtm->votingSemaphore);
590616
CHECK_FOR_INTERRUPTS();
591617

592-
if (send_heartbeat) {
593-
send_heartbeat= false;
594-
enable_timeout_after(heartbeat_timer,MtmHeartbeatSendTimeout);
595-
MtmBroadcastMessage(txBuffer,NULL);
596-
}
618+
MtmCheckHeartbeat();
597619
/*
598620
* Use shared lock to improve locality,
599621
* because all other process modifying this list are using exclusive lock
@@ -676,7 +698,7 @@ static void MtmTransReceiver(Datum arg)
676698

677699
while (!stop) {
678700
#ifUSE_EPOLL
679-
n=epoll_wait(epollfd,events,nNodes,MtmKeepaliveTimeout/1000);
701+
n=epoll_wait(epollfd,events,nNodes,USEC_TO_MSEC(MtmKeepaliveTimeout));
680702
if (n<0) {
681703
if (errno==EINTR) {
682704
continue;

‎multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1693,7 +1693,7 @@ _PG_init(void)
16931693
"Timeout in milliseconds of receiving heartbeat messages",
16941694
"If no heartbeat message is received from node within this period, it assumed to be dead",
16951695
&MtmHeartbeatRecvTimeout,
1696-
100000,
1696+
10000,
16971697
1,
16981698
INT_MAX,
16991699
PGC_BACKEND,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp