Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit629f58b

Browse files
knizhnikkelvich
authored andcommitted
Make it possible to add nodes to the cluster
1 parentb8f9bb7 commit629f58b

File tree

6 files changed

+179
-120
lines changed

6 files changed

+179
-120
lines changed

‎arbiter.c

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -369,13 +369,13 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
369369

370370
staticvoidMtmOpenConnections()
371371
{
372-
intnNodes=MtmNodes;
372+
intnNodes=MtmMaxNodes;
373373
inti;
374374

375375
sockets= (int*)palloc(sizeof(int)*nNodes);
376376

377377
for (i=0;i<nNodes;i++) {
378-
if (i+1!=MtmNodeId) {
378+
if (i+1!=MtmNodeId&&i<Mtm->nAllNodes) {
379379
sockets[i]=MtmConnectSocket(Mtm->nodes[i].con.hostName,MtmArbiterPort+i+1,MtmConnectAttempts);
380380
if (sockets[i]<0) {
381381
MtmOnNodeDisconnect(i+1);
@@ -384,8 +384,8 @@ static void MtmOpenConnections()
384384
sockets[i]=-1;
385385
}
386386
}
387-
if (Mtm->nNodes<MtmNodes/2+1) {/* no quorum */
388-
elog(WARNING,"Node is out of quorum: only %d nodesfrom %d areaccssible",Mtm->nNodes,MtmNodes);
387+
if (Mtm->nLiveNodes<Mtm->nAllNodes/2+1) {/* no quorum */
388+
elog(WARNING,"Node is out of quorum: only %d nodesof %d areaccessible",Mtm->nLiveNodes,Mtm->nAllNodes);
389389
MtmSwitchClusterMode(MTM_IN_MINORITY);
390390
}elseif (Mtm->status==MTM_INITIALIZATION) {
391391
MtmSwitchClusterMode(MTM_CONNECTED);
@@ -444,7 +444,7 @@ static void MtmAcceptOneConnection()
444444
elog(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
445445
close(fd);
446446
}else{
447-
Assert(req.hdr.node>0&&req.hdr.node <=MtmNodes&&req.hdr.node!=MtmNodeId);
447+
Assert(req.hdr.node>0&&req.hdr.node <=Mtm->nAllNodes&&req.hdr.node!=MtmNodeId);
448448
resp.code=MSG_STATUS;
449449
resp.disabledNodeMask=Mtm->disabledNodeMask;
450450
resp.dxid=HANDSHAKE_MAGIC;
@@ -472,9 +472,10 @@ static void MtmAcceptIncomingConnections()
472472
structsockaddr_insock_inet;
473473
inton=1;
474474
inti;
475+
intnNodes=MtmMaxNodes;
475476

476-
sockets= (int*)palloc(sizeof(int)*MtmNodes);
477-
for (i=0;i<MtmNodes;i++) {
477+
sockets= (int*)palloc(sizeof(int)*nNodes);
478+
for (i=0;i<nNodes;i++) {
478479
sockets[i]=-1;
479480
}
480481
sock_inet.sin_family=AF_INET;
@@ -490,7 +491,7 @@ static void MtmAcceptIncomingConnections()
490491
if (bind(gateway, (structsockaddr*)&sock_inet,sizeof(sock_inet))<0) {
491492
elog(ERROR,"Arbiter failed to bind socket: %d",errno);
492493
}
493-
if (listen(gateway,MtmNodes)<0) {
494+
if (listen(gateway,nNodes)<0) {
494495
elog(ERROR,"Arbiter failed to listen socket: %d",errno);
495496
}
496497

@@ -527,22 +528,22 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
527528
{
528529
inti;
529530
intn=1;
530-
for (i=0;i<MtmNodes;i++)
531+
for (i=0;i<Mtm->nAllNodes;i++)
531532
{
532533
if (!BIT_CHECK(Mtm->disabledNodeMask,i)&&TransactionIdIsValid(ts->xids[i])) {
533534
Assert(i+1!=MtmNodeId);
534535
MtmAppendBuffer(txBuffer,ts->xids[i],i,ts);
535536
n+=1;
536537
}
537538
}
538-
Assert(n==Mtm->nNodes);
539+
Assert(n==Mtm->nLiveNodes);
539540
}
540541

541542

542543
staticvoidMtmTransSender(Datumarg)
543544
{
544545
sigset_tsset;
545-
intnNodes=MtmNodes;
546+
intnNodes=MtmMaxNodes;
546547
inti;
547548
MtmBuffer*txBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
548549

@@ -580,7 +581,7 @@ static void MtmTransSender(Datum arg)
580581

581582
MtmUnlock();
582583

583-
for (i=0;i<nNodes;i++) {
584+
for (i=0;i<Mtm->nAllNodes;i++) {
584585
if (txBuffer[i].used!=0) {
585586
MtmSendToNode(i,txBuffer[i].data,txBuffer[i].used*sizeof(MtmArbiterMessage));
586587
txBuffer[i].used=0;
@@ -593,7 +594,7 @@ static void MtmTransSender(Datum arg)
593594
#if !USE_EPOLL
594595
staticboolMtmRecovery()
595596
{
596-
intnNodes=MtmNodes;
597+
intnNodes=Mtm->nAllNodes;
597598
boolrecovered= false;
598599
inti;
599600

@@ -618,7 +619,7 @@ static bool MtmRecovery()
618619
staticvoidMtmTransReceiver(Datumarg)
619620
{
620621
sigset_tsset;
621-
intnNodes=MtmNodes;
622+
intnNodes=MtmMaxNodes;
622623
intnResponses;
623624
inti,j,n,rc;
624625
MtmBuffer*rxBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
@@ -708,7 +709,7 @@ static void MtmTransReceiver(Datum arg)
708709
if (MtmIsCoordinator(ts)) {
709710
switch (msg->code) {
710711
caseMSG_READY:
711-
Assert(ts->nVotes<Mtm->nNodes);
712+
Assert(ts->nVotes<Mtm->nLiveNodes);
712713
Mtm->nodes[msg->node-1].transDelay+=MtmGetCurrentTime()-ts->csn;
713714
ts->xids[msg->node-1]=msg->sxid;
714715

@@ -720,7 +721,7 @@ static void MtmTransReceiver(Datum arg)
720721
MtmAbortTransaction(ts);
721722
}
722723

723-
if (++ts->nVotes==Mtm->nNodes) {
724+
if (++ts->nVotes==Mtm->nLiveNodes) {
724725
/* All nodes are finished their transactions */
725726
if (ts->status==TRANSACTION_STATUS_ABORTED) {
726727
MtmWakeUpBackend(ts);
@@ -736,23 +737,23 @@ static void MtmTransReceiver(Datum arg)
736737
}
737738
break;
738739
caseMSG_ABORTED:
739-
Assert(ts->nVotes<Mtm->nNodes);
740+
Assert(ts->nVotes<Mtm->nLiveNodes);
740741
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
741742
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
742743
MtmAbortTransaction(ts);
743744
}
744-
if (++ts->nVotes==Mtm->nNodes) {
745+
if (++ts->nVotes==Mtm->nLiveNodes) {
745746
MtmWakeUpBackend(ts);
746747
}
747748
break;
748749
caseMSG_PREPARED:
749750
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
750-
Assert(ts->nVotes<Mtm->nNodes);
751+
Assert(ts->nVotes<Mtm->nLiveNodes);
751752
if (msg->csn>ts->csn) {
752753
ts->csn=msg->csn;
753754
MtmSyncClock(ts->csn);
754755
}
755-
if (++ts->nVotes==Mtm->nNodes) {
756+
if (++ts->nVotes==Mtm->nLiveNodes) {
756757
ts->csn=MtmAssignCSN();
757758
ts->status=TRANSACTION_STATUS_UNKNOWN;
758759
MtmWakeUpBackend(ts);

‎bgwpool.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ typedef uint64 timestamp_t;
1212
#defineMAX_DBNAME_LEN 30
1313
#defineMULTIMASTER_BGW_RESTART_TIMEOUT 1/* seconds */
1414

15+
externtimestamp_tMtmGetSystemTime(void);/* non-adjusted current system time */
16+
externtimestamp_tMtmGetCurrentTime(void);/* adjusted current system time */
17+
1518
typedefstruct
1619
{
1720
BgwPoolExecutorexecutor;

‎multimaster--1.0.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16+
CREATEFUNCTIONmtm.add_node(conn_str cstring) RETURNS void
17+
AS'MODULE_PATHNAME','mtm_add_node'
18+
LANGUAGE C;
19+
1620
-- Create replication slot for the node which was previously dropped together with it's slot
1721
CREATEFUNCTIONmtm.recover_node(nodeinteger) RETURNS void
1822
AS'MODULE_PATHNAME','mtm_recover_node'
@@ -30,7 +34,7 @@ CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
3034
AS'MODULE_PATHNAME','mtm_get_nodes_state'
3135
LANGUAGE C;
3236

33-
CREATETYPEmtm.cluster_stateAS ("status"text,"disabledNodeMask"bigint,"disconnectedNodeMask"bigint,"catchUpNodeMask"bigint,"nNodes"integer,"nActiveQueries"integer,"nPendingQueries"integer,"queueSize"bigint,"transCount"bigint,"timeShift"bigint,"recoverySlot"integer);
37+
CREATETYPEmtm.cluster_stateAS ("status"text,"disabledNodeMask"bigint,"disconnectedNodeMask"bigint,"catchUpNodeMask"bigint,"liveNodes"integer,"allNodes"integer,"nActiveQueries"integer,"nPendingQueries"integer,"queueSize"bigint,"transCount"bigint,"timeShift"bigint,"recoverySlot"integer);
3438

3539
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
3640
AS'MODULE_PATHNAME','mtm_get_cluster_state'

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp