Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaf34ad7

Browse files
committed
Refresh cluster state only in monitor process
1 parent61ca3a5 commitaf34ad7

File tree

5 files changed

+86
-93
lines changed

5 files changed

+86
-93
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ static void MtmSender(Datum arg);
9090
staticvoidMtmReceiver(Datumarg);
9191
staticvoidMtmMonitor(Datumarg);
9292
staticvoidMtmSendHeartbeat(void);
93-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
93+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout);
9494

9595
charconst*constMtmMessageKindMnem[]=
9696
{
@@ -214,7 +214,7 @@ static void MtmDisconnect(int node)
214214
MtmOnNodeDisconnect(node+1);
215215
}
216216

217-
staticintMtmWaitSocket(intsd,boolforWrite,time_ttimeoutMsec)
217+
staticintMtmWaitSocket(intsd,boolforWrite,timestamp_ttimeoutMsec)
218218
{
219219
structtimevaltv;
220220
fd_setset;
@@ -227,7 +227,7 @@ static int MtmWaitSocket(int sd, bool forWrite, time_t timeoutMsec)
227227
MtmCheckHeartbeat();
228228
now=MtmGetSystemTime();
229229
if (now>deadline) {
230-
return0;
230+
now=deadline;
231231
}
232232
tv.tv_sec= (deadline-now)/USECS_PER_SEC;
233233
tv.tv_usec= (deadline-now)%USECS_PER_SEC;
@@ -355,7 +355,7 @@ static void MtmSendHeartbeat()
355355
timestamp_tnow=MtmGetSystemTime();
356356
msg.code=MSG_HEARTBEAT;
357357
msg.disabledNodeMask=Mtm->disabledNodeMask;
358-
msg.connectivityMask=Mtm->connectivityMask;
358+
msg.connectivityMask=SELF_CONNECTIVITY_MASK;
359359
msg.oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
360360
msg.node=MtmNodeId;
361361
msg.csn=now;
@@ -373,15 +373,15 @@ static void MtmSendHeartbeat()
373373
|| !BIT_CHECK(Mtm->disabledNodeMask,i)
374374
||BIT_CHECK(Mtm->reconnectMask,i)))
375375
{
376-
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
376+
if (!MtmSendToNode(i,&msg,sizeof(msg),MtmHeartbeatSendTimeout)) {
377377
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
378378
}else {
379379
if (last_heartbeat_to_node[i]+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
380380
MTM_LOG1("Last heartbeat to node %d was sent %ld microseconds ago",i+1,now-last_heartbeat_to_node[i]);
381381
}
382382
last_heartbeat_to_node[i]=now;
383383
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
384-
if (BIT_CHECK(Mtm->connectivityMask,i)) {
384+
if (BIT_CHECK(SELF_CONNECTIVITY_MASK,i)) {
385385
MTM_LOG1("Force reconnect to node %d",i+1);
386386
close(sockets[i]);
387387
sockets[i]=-1;
@@ -411,7 +411,7 @@ void MtmCheckHeartbeat()
411411
}
412412

413413

414-
staticintMtmConnectSocket(intnode,intport,inttimeout)
414+
staticintMtmConnectSocket(intnode,intport,time_ttimeout)
415415
{
416416
structsockaddr_insock_inet;
417417
unsignedaddrs[MAX_ROUTES];
@@ -422,6 +422,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
422422
timestamp_tstart=MtmGetSystemTime();
423423
charconst*host=Mtm->nodes[node].con.hostName;
424424
nodemask_tsave_mask=busy_mask;
425+
timestamp_tafterWait;
426+
timestamp_tbeforeWait;
425427

426428
sock_inet.sin_family=AF_INET;
427429
sock_inet.sin_port=htons(port);
@@ -435,7 +437,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
435437
Retry:
436438
while (1) {
437439
intrc=-1;
438-
439440
sd=socket(AF_INET,SOCK_STREAM,0);
440441
if (sd<0) {
441442
elog(LOG,"Arbiter failed to create socket: %d",errno);
@@ -461,7 +462,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
461462
if (rc==0) {
462463
break;
463464
}
464-
if (errno!=EINPROGRESS||start+MSEC_TO_USEC(timeout)<MtmGetSystemTime()) {
465+
beforeWait=MtmGetSystemTime();
466+
if (errno!=EINPROGRESS||start+MSEC_TO_USEC(timeout)<beforeWait ) {
465467
elog(WARNING,"Arbiter failed to connect to %s:%d: error=%d",host,port,errno);
466468
close(sd);
467469
busy_mask=save_mask;
@@ -485,8 +487,10 @@ static int MtmConnectSocket(int node, int port, int timeout)
485487
elog(WARNING,"Arbiter waiting socket to %s:%d: rc=%d, error=%d",host,port,rc,errno);
486488
}
487489
close(sd);
488-
MtmCheckHeartbeat();
489-
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
490+
afterWait=MtmGetSystemTime();
491+
if (afterWait<beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
492+
MtmSleep(beforeWait+MSEC_TO_USEC(MtmHeartbeatSendTimeout)-afterWait);
493+
}
490494
}
491495
}
492496
MtmSetSocketOptions(sd);
@@ -496,7 +500,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
496500
req.hdr.sxid=ShmemVariableCache->nextXid;
497501
req.hdr.csn=MtmGetCurrentTime();
498502
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
499-
req.hdr.connectivityMask=Mtm->connectivityMask;
503+
req.hdr.connectivityMask=SELF_CONNECTIVITY_MASK;
500504
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
501505
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
502506
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
@@ -553,7 +557,7 @@ static void MtmOpenConnections()
553557
}
554558

555559

556-
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
560+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize,time_treconnectTimeout)
557561
{
558562
boolresult= true;
559563
nodemask_tsave_mask=busy_mask;
@@ -580,7 +584,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
580584
close(sockets[node]);
581585
sockets[node]=-1;
582586
}
583-
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort,MtmReconnectTimeout);
587+
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort,reconnectTimeout);
584588
if (sockets[node]<0) {
585589
MtmOnNodeDisconnect(node+1);
586590
result= false;
@@ -634,7 +638,7 @@ static void MtmAcceptOneConnection()
634638

635639
resp.code=MSG_STATUS;
636640
resp.disabledNodeMask=Mtm->disabledNodeMask;
637-
resp.connectivityMask=Mtm->connectivityMask;
641+
resp.connectivityMask=SELF_CONNECTIVITY_MASK;
638642
resp.dxid=HANDSHAKE_MAGIC;
639643
resp.sxid=ShmemVariableCache->nextXid;
640644
resp.csn=MtmGetCurrentTime();
@@ -759,7 +763,7 @@ static void MtmSender(Datum arg)
759763

760764
for (i=0;i<Mtm->nAllNodes;i++) {
761765
if (txBuffer[i].used!=0) {
762-
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage));
766+
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage),MtmReconnectTimeout);
763767
txBuffer[i].used=0;
764768
}
765769
}
@@ -813,7 +817,7 @@ static void MtmMonitor(Datum arg)
813817
BackgroundWorkerInitializeConnection(MtmDatabaseName,NULL);
814818

815819
while (!stop) {
816-
intrc=WaitLatch(&MyProc->procLatch,WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatRecvTimeout);
820+
intrc=WaitLatch(&MyProc->procLatch,WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatSendTimeout);
817821
if (rc&WL_POSTMASTER_DEATH) {
818822
break;
819823
}
@@ -951,7 +955,7 @@ static void MtmReceiver(Datum arg)
951955
MTM_LOG1("Send response %s for transaction %s to node %d",MtmTxnStatusMnem[msg->status],msg->gid,msg->node);
952956
}
953957
msg->disabledNodeMask=Mtm->disabledNodeMask;
954-
msg->connectivityMask=Mtm->connectivityMask;
958+
msg->connectivityMask=SELF_CONNECTIVITY_MASK;
955959
msg->oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
956960
msg->code=MSG_POLL_STATUS;
957961
MtmSendMessage(msg);
@@ -1142,10 +1146,10 @@ static void MtmReceiver(Datum arg)
11421146
}
11431147
}
11441148
if (Mtm->status==MTM_ONLINE) {
1145-
/* Check for hearbeat only in case of timeout expiration: it means that we do not have unproceeded events.
1149+
now=MtmGetSystemTime();
1150+
/* Check for heartbeats only in case of timeout expiration: it means that we do not have unproceeded events.
11461151
* It helps to avoid false node failure detection because of blocking receiver.
11471152
*/
1148-
now=MtmGetSystemTime();
11491153
if (n==0) {
11501154
selectTimeout=MtmHeartbeatRecvTimeout;/* restore select timeout */
11511155
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
@@ -1158,10 +1162,6 @@ static void MtmReceiver(Datum arg)
11581162
}
11591163
lastHeartbeatCheck=now;
11601164
}
1161-
if (Mtm->disabledNodeMask!=0) {
1162-
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1163-
MtmRefreshClusterStatus();
1164-
}
11651165
}else {
11661166
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
11671167
/* Switch to non-blocking mode to proceed all pending requests before doing watchdog check */

‎contrib/mmts/multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ AS 'MODULE_PATHNAME','mtm_get_last_csn'
3636
LANGUAGE C;
3737

3838

39-
CREATETYPEmtm.node_stateAS ("id"integer,"disabled" bool,"disconnected" bool,"catchUp" bool,"slotLag"bigint,"avgTransDelay"bigint,"lastStatusChange"timestamp,"oldestSnapshot"bigint,"SenderPid"integer,"SenderStartTime"timestamp,"ReceiverPid"integer,"ReceiverStartTime"timestamp,"connStr"text);
39+
CREATETYPEmtm.node_stateAS ("id"integer,"disabled" bool,"disconnected" bool,"catchUp" bool,"slotLag"bigint,"avgTransDelay"bigint,"lastStatusChange"timestamp,"oldestSnapshot"bigint,"SenderPid"integer,"SenderStartTime"timestamp,"ReceiverPid"integer,"ReceiverStartTime"timestamp,"connStr"text,"connectivityMask"bigint);
4040

4141
CREATEFUNCTIONmtm.get_nodes_state() RETURNS SETOFmtm.node_state
4242
AS'MODULE_PATHNAME','mtm_get_nodes_state'

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp