Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit055c6a6

Browse files
knizhnikkelvich
authored andcommitted
Remove vacuum delay
1 parent5aafae3 commit055c6a6

File tree

4 files changed

+365
-333
lines changed

4 files changed

+365
-333
lines changed

‎arbiter.c

Lines changed: 80 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -82,20 +82,25 @@ typedef struct
8282
intnode;/* Sender node ID */
8383
TransactionIddxid;/* Transaction ID at destination node */
8484
TransactionIdsxid;/* Transaction ID at sender node */
85-
csn_tcsn;/* local CSN in case of sending data from replica to master, global CSN master->replica */
86-
nodemask_tdisabledNodeMask;/* bitmask of disabled nodes at the sender of message */
85+
csn_tcsn;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
86+
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
87+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
8788
}MtmArbiterMessage;
8889

90+
typedefstruct
91+
{
92+
MtmArbiterMessagehdr;
93+
charconnStr[MULTIMASTER_MAX_CONN_STR_SIZE];
94+
}MtmHandshakeMessage;
95+
8996
typedefstruct
9097
{
9198
intused;
9299
MtmArbiterMessagedata[BUFFER_SIZE];
93100
}MtmBuffer;
94101

95102
staticint*sockets;
96-
staticchar**hosts;
97103
staticintgateway;
98-
staticMtmState*ds;
99104

100105
staticvoidMtmTransSender(Datumarg);
101106
staticvoidMtmTransReceiver(Datumarg);
@@ -266,39 +271,41 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
266271
continue;
267272
}else {
268273
intoptval=1;
269-
MtmArbiterMessagemsg;
274+
MtmHandshakeMessagereq;
275+
MtmArbiterMessageresp;
270276
setsockopt(sd,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(optval));
271277
setsockopt(sd,SOL_SOCKET,SO_KEEPALIVE, (charconst*)&optval,sizeof(optval));
272278

273-
msg.code=MSG_HANDSHAKE;
274-
msg.node=MtmNodeId;
275-
msg.dxid=HANDSHAKE_MAGIC;
276-
msg.sxid=ShmemVariableCache->nextXid;
277-
msg.csn=MtmGetCurrentTime();
278-
msg.disabledNodeMask=ds->disabledNodeMask;
279-
if (!MtmWriteSocket(sd,&msg,sizeofmsg)) {
279+
req.hdr.code=MSG_HANDSHAKE;
280+
req.hdr.node=MtmNodeId;
281+
req.hdr.dxid=HANDSHAKE_MAGIC;
282+
req.hdr.sxid=ShmemVariableCache->nextXid;
283+
req.hdr.csn=MtmGetCurrentTime();
284+
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
285+
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].connStr);
286+
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
280287
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
281288
close(sd);
282289
gotoRetry;
283290
}
284-
if (MtmReadSocket(sd,&msg,sizeofmsg)!=sizeof(msg)) {
291+
if (MtmReadSocket(sd,&resp,sizeofresp)!=sizeof(resp)) {
285292
elog(WARNING,"Arbiter failed to receive response for handshake message from %s:%d: errno=%d",host,port,errno);
286293
close(sd);
287294
gotoRetry;
288295
}
289-
if (msg.code!=MSG_STATUS||msg.dxid!=HANDSHAKE_MAGIC) {
290-
elog(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d",msg.code,host,port);
296+
if (resp.code!=MSG_STATUS||resp.dxid!=HANDSHAKE_MAGIC) {
297+
elog(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d",resp.code,host,port);
291298
close(sd);
292299
gotoRetry;
293300
}
294301

295302
/* Some node considered that I am dead, so switch to recovery mode */
296-
if (BIT_CHECK(msg.disabledNodeMask,MtmNodeId-1)) {
297-
elog(WARNING,"Node %d think that I am dead",msg.node);
303+
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
304+
elog(WARNING,"Node %d think that I am dead",resp.node);
298305
MtmSwitchClusterMode(MTM_RECOVERY);
299306
}
300307
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
301-
ds->disabledNodeMask |=msg.disabledNodeMask;
308+
Mtm->disabledNodeMask |=resp.disabledNodeMask;
302309
returnsd;
303310
}
304311
}
@@ -309,39 +316,23 @@ static void MtmOpenConnections()
309316
{
310317
intnNodes=MtmNodes;
311318
inti;
312-
char*connStr=pstrdup(MtmConnStrs);
313319

314320
sockets= (int*)palloc(sizeof(int)*nNodes);
315-
hosts= (char**)palloc(sizeof(char*)*nNodes);
316321

317322
for (i=0;i<nNodes;i++) {
318-
char*host=strstr(connStr,"host=");
319-
char*end;
320-
if (host==NULL) {
321-
elog(ERROR,"Invalid connection string: '%s'",MtmConnStrs);
322-
}
323-
host+=5;
324-
for (end=host;*end!=' '&&*end!=','&&*end!='\0';end++);
325-
if (*end!='\0') {
326-
*end='\0';
327-
connStr=end+1;
328-
}else {
329-
connStr=end;
330-
}
331-
hosts[i]=host;
332323
if (i+1!=MtmNodeId) {
333-
sockets[i]=MtmConnectSocket(host,MtmArbiterPort+i+1,MtmConnectAttempts);
324+
sockets[i]=MtmConnectSocket(Mtm->nodes[i].hostName,MtmArbiterPort+i+1,MtmConnectAttempts);
334325
if (sockets[i]<0) {
335326
MtmOnNodeDisconnect(i+1);
336327
}
337328
}else {
338329
sockets[i]=-1;
339330
}
340331
}
341-
if (ds->nNodes<MtmNodes/2+1) {/* no quorum */
342-
elog(WARNING,"Node is out of quorum: only %d nodes from %d are accssible",ds->nNodes,MtmNodes);
343-
ds->status=MTM_OFFLINE;
344-
}elseif (ds->status==MTM_INITIALIZATION) {
332+
if (Mtm->nNodes<MtmNodes/2+1) {/* no quorum */
333+
elog(WARNING,"Node is out of quorum: only %d nodes from %d are accssible",Mtm->nNodes,MtmNodes);
334+
Mtm->status=MTM_OFFLINE;
335+
}elseif (Mtm->status==MTM_INITIALIZATION) {
345336
MtmSwitchClusterMode(MTM_CONNECTED);
346337
}
347338
}
@@ -354,7 +345,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
354345
if (sockets[node] >=0) {
355346
close(sockets[node]);
356347
}
357-
sockets[node]=MtmConnectSocket(hosts[node],MtmArbiterPort+node+1,MtmReconnectAttempts);
348+
sockets[node]=MtmConnectSocket(Mtm->nodes[node].hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
358349
if (sockets[node]<0) {
359350
MtmOnNodeDisconnect(node+1);
360351
return false;
@@ -379,29 +370,31 @@ static void MtmAcceptOneConnection()
379370
if (fd<0) {
380371
elog(WARNING,"Arbiter failed to accept socket: %d",errno);
381372
}else {
382-
MtmArbiterMessagemsg;
383-
intrc=MtmReadSocket(fd,&msg,sizeofmsg);
384-
if (rc<sizeof(msg)) {
373+
MtmHandshakeMessagereq;
374+
MtmArbiterMessageresp;
375+
intrc=MtmReadSocket(fd,&req,sizeofreq);
376+
if (rc<sizeof(req)) {
385377
elog(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
386-
}elseif (msg.code!=MSG_HANDSHAKE&&msg.dxid!=HANDSHAKE_MAGIC) {
387-
elog(WARNING,"Arbiter get unexpected handshake message %d",msg.code);
378+
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
379+
elog(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
388380
close(fd);
389381
}else{
390-
Assert(msg.node>0&&msg.node <=MtmNodes&&msg.node!=MtmNodeId);
391-
msg.code=MSG_STATUS;
392-
msg.disabledNodeMask=ds->disabledNodeMask;
393-
msg.dxid=HANDSHAKE_MAGIC;
394-
msg.sxid=ShmemVariableCache->nextXid;
395-
msg.csn=MtmGetCurrentTime();
396-
if (!MtmWriteSocket(fd,&msg,sizeofmsg)) {
397-
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",msg.node);
382+
Assert(req.hdr.node>0&&req.hdr.node <=MtmNodes&&req.hdr.node!=MtmNodeId);
383+
resp.code=MSG_STATUS;
384+
resp.disabledNodeMask=Mtm->disabledNodeMask;
385+
resp.dxid=HANDSHAKE_MAGIC;
386+
resp.sxid=ShmemVariableCache->nextXid;
387+
resp.csn=MtmGetCurrentTime();
388+
MtmUpdateNodeConnStr(req.hdr.node,req.connStr);
389+
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
390+
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",resp.node);
398391
close(fd);
399392
}else {
400-
elog(NOTICE,"Arbiter established connection with node %d",msg.node);
401-
BIT_CLEAR(ds->connectivityMask,msg.node-1);
402-
MtmRegisterSocket(fd,msg.node-1);
403-
sockets[msg.node-1]=fd;
404-
MtmOnNodeConnect(msg.node);
393+
elog(NOTICE,"Arbiter established connection with node %d",req.hdr.node);
394+
BIT_CLEAR(Mtm->connectivityMask,req.hdr.node-1);
395+
MtmRegisterSocket(fd,req.hdr.node-1);
396+
sockets[req.hdr.node-1]=fd;
397+
MtmOnNodeConnect(req.hdr.node);
405398
}
406399
}
407400
}
@@ -415,7 +408,9 @@ static void MtmAcceptIncomingConnections()
415408
inti;
416409

417410
sockets= (int*)palloc(sizeof(int)*MtmNodes);
418-
411+
for (i=0;i<MtmNodes;i++) {
412+
sockets[i]=-1;
413+
}
419414
sock_inet.sin_family=AF_INET;
420415
sock_inet.sin_addr.s_addr=htonl(INADDR_ANY);
421416
sock_inet.sin_port=htons(MtmArbiterPort+MtmNodeId);
@@ -461,7 +456,8 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
461456
buf->data[buf->used].sxid=ts->xid;
462457
buf->data[buf->used].csn=ts->csn;
463458
buf->data[buf->used].node=MtmNodeId;
464-
buf->data[buf->used].disabledNodeMask=ds->disabledNodeMask;
459+
buf->data[buf->used].disabledNodeMask=Mtm->disabledNodeMask;
460+
buf->data[buf->used].oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
465461
buf->used+=1;
466462
}
467463

@@ -477,7 +473,7 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
477473
n+=1;
478474
}
479475
}
480-
Assert(n==ds->nNodes);
476+
Assert(n==Mtm->nNodes);
481477
}
482478

483479

@@ -487,8 +483,6 @@ static void MtmTransSender(Datum arg)
487483
inti;
488484
MtmBuffer*txBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
489485

490-
ds=MtmGetState();
491-
492486
MtmOpenConnections();
493487

494488
for (i=0;i<nNodes;i++) {
@@ -497,7 +491,7 @@ static void MtmTransSender(Datum arg)
497491

498492
while (true) {
499493
MtmTransState*ts;
500-
PGSemaphoreLock(&ds->votingSemaphore);
494+
PGSemaphoreLock(&Mtm->votingSemaphore);
501495
CHECK_FOR_INTERRUPTS();
502496

503497
/*
@@ -506,14 +500,14 @@ static void MtmTransSender(Datum arg)
506500
*/
507501
MtmLock(LW_SHARED);
508502

509-
for (ts=ds->votingTransactions;ts!=NULL;ts=ts->nextVoting) {
503+
for (ts=Mtm->votingTransactions;ts!=NULL;ts=ts->nextVoting) {
510504
if (MtmIsCoordinator(ts)) {
511505
MtmBroadcastMessage(txBuffer,ts);
512506
}else {
513507
MtmAppendBuffer(txBuffer,ts->gtid.xid,ts->gtid.node-1,ts);
514508
}
515509
}
516-
ds->votingTransactions=NULL;
510+
Mtm->votingTransactions=NULL;
517511

518512
MtmUnlock();
519513

@@ -573,8 +567,6 @@ static void MtmTransReceiver(Datum arg)
573567
max_fd=0;
574568
#endif
575569

576-
ds=MtmGetState();
577-
578570
MtmAcceptIncomingConnections();
579571

580572
for (i=0;i<nNodes;i++) {
@@ -613,7 +605,7 @@ static void MtmTransReceiver(Datum arg)
613605
elog(ERROR,"Arbiter failed to select sockets: %d",errno);
614606
}
615607
for (i=0;i<nNodes;i++) {
616-
if (FD_ISSET(sockets[i],&events))
608+
if (sockets[i] >=0&&FD_ISSET(sockets[i],&events))
617609
#endif
618610
{
619611
if (i+1==MtmNodeId) {
@@ -637,13 +629,24 @@ static void MtmTransReceiver(Datum arg)
637629
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&msg->dxid,HASH_FIND,NULL);
638630
Assert(ts!=NULL);
639631
Assert(msg->node>0&&msg->node <=nNodes&&msg->node!=MtmNodeId);
632+
633+
Mtm->nodes[msg->node-1].oldestSnapshot=msg->oldestSnapshot;
634+
640635
if (MtmIsCoordinator(ts)) {
641636
switch (msg->code) {
642637
caseMSG_READY:
643-
Assert(ts->nVotes<ds->nNodes);
644-
ds->nodeTransDelay[msg->node-1]+=MtmGetCurrentTime()-ts->csn;
638+
Assert(ts->nVotes<Mtm->nNodes);
639+
Mtm->nodes[msg->node-1].transDelay+=MtmGetCurrentTime()-ts->csn;
645640
ts->xids[msg->node-1]=msg->sxid;
646-
if (++ts->nVotes==ds->nNodes) {
641+
642+
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
643+
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
644+
commit on smaller subset of nodes */
645+
ts->status=TRANSACTION_STATUS_ABORTED;
646+
MtmAdjustSubtransactions(ts);
647+
}
648+
649+
if (++ts->nVotes==Mtm->nNodes) {
647650
/* All nodes are finished their transactions */
648651
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
649652
ts->nVotes=1;/* I voted myself */
@@ -655,24 +658,24 @@ static void MtmTransReceiver(Datum arg)
655658
}
656659
break;
657660
caseMSG_ABORTED:
658-
Assert(ts->nVotes<ds->nNodes);
661+
Assert(ts->nVotes<Mtm->nNodes);
659662
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
660663
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
661664
ts->status=TRANSACTION_STATUS_ABORTED;
662665
MtmAdjustSubtransactions(ts);
663666
}
664-
if (++ts->nVotes==ds->nNodes) {
667+
if (++ts->nVotes==Mtm->nNodes) {
665668
MtmWakeUpBackend(ts);
666669
}
667670
break;
668671
caseMSG_PREPARED:
669672
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
670-
Assert(ts->nVotes<ds->nNodes);
673+
Assert(ts->nVotes<Mtm->nNodes);
671674
if (msg->csn>ts->csn) {
672675
ts->csn=msg->csn;
673676
MtmSyncClock(ts->csn);
674677
}
675-
if (++ts->nVotes==ds->nNodes) {
678+
if (++ts->nVotes==Mtm->nNodes) {
676679
ts->csn=MtmAssignCSN();
677680
ts->status=TRANSACTION_STATUS_UNKNOWN;
678681
MtmWakeUpBackend(ts);
@@ -703,7 +706,7 @@ static void MtmTransReceiver(Datum arg)
703706
}
704707
}
705708
}
706-
if (n==0&&ds->disabledNodeMask!=0) {
709+
if (n==0&&Mtm->disabledNodeMask!=0) {
707710
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
708711
MtmRefreshClusterStatus(false);
709712
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp