Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit516a531

Browse files
committed
Propages seqno in both direction
1 parentd952fd4 commit516a531

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ void MtmCheckHeartbeat()
368368
}
369369

370370

371-
staticintMtmConnectSocket(charconst*host,intport,inttimeout)
371+
staticintMtmConnectSocket(intnode,intport,inttimeout)
372372
{
373373
structsockaddr_insock_inet;
374374
unsignedaddrs[MAX_ROUTES];
@@ -377,7 +377,7 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
377377
MtmArbiterMessageresp;
378378
intsd;
379379
timestamp_tstart=MtmGetSystemTime();
380-
380+
charconst*host=Mtm->nodes[node].con.hostName;
381381

382382
sock_inet.sin_family=AF_INET;
383383
sock_inet.sin_port=htons(port);
@@ -446,6 +446,7 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
446446
req.hdr.sxid=ShmemVariableCache->nextXid;
447447
req.hdr.csn=MtmGetCurrentTime();
448448
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
449+
req.hdr.seqno=Mtm->nodes[node].recvSeqNo;
449450
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
450451
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
451452
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
@@ -464,7 +465,9 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
464465
}
465466

466467
MtmLock(LW_EXCLUSIVE);
467-
Mtm->nodes[resp.node-1].sendSeqNo=resp.seqno;
468+
if (Mtm->nodes[resp.node-1].sendSeqNo<resp.seqno) {
469+
Mtm->nodes[resp.node-1].sendSeqNo=resp.seqno;
470+
}
468471

469472
/* Some node considered that I am dead, so switch to recovery mode */
470473
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
@@ -499,7 +502,7 @@ static void MtmOpenConnections()
499502
}else {
500503
arbiterPort=MtmArbiterPort+i+1;
501504
}
502-
sockets[i]=MtmConnectSocket(Mtm->nodes[i].con.hostName,arbiterPort,MtmConnectTimeout);
505+
sockets[i]=MtmConnectSocket(i,arbiterPort,MtmConnectTimeout);
503506
if (sockets[i]<0) {
504507
MtmOnNodeDisconnect(i+1);
505508
}
@@ -531,7 +534,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
531534
close(sockets[node]);
532535
sockets[node]=-1;
533536
}
534-
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectTimeout);
537+
sockets[node]=MtmConnectSocket(node,MtmArbiterPort+node+1,MtmReconnectTimeout);
535538
if (sockets[node]<0) {
536539
MtmOnNodeDisconnect(node+1);
537540
return false;
@@ -579,6 +582,9 @@ static void MtmAcceptOneConnection()
579582
resp.csn=MtmGetCurrentTime();
580583
resp.node=MtmNodeId;
581584
resp.seqno=Mtm->nodes[req.hdr.node-1].recvSeqNo;
585+
if (Mtm->nodes[req.hdr.node-1].sendSeqNo<req.hdr.seqno) {
586+
Mtm->nodes[req.hdr.node-1].sendSeqNo=req.hdr.seqno;
587+
}
582588
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con,req.connStr);
583589
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
584590
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",resp.node);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp