Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite0105f6

Browse files
knizhnikkelvich
authored andcommitted
Update Paxos interface
1 parent9eb2b3d commite0105f6

File tree

5 files changed

+53
-26
lines changed

5 files changed

+53
-26
lines changed

‎arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
297297

298298
/* Some node considered that I am dead, so switch to recovery mode */
299299
if (BIT_CHECK(msg.disabledNodeMask,MtmNodeId-1)) {
300-
MtmClusterSwitchMode(MTM_RECOVERY);
300+
MtmSwitchClusterMode(MTM_RECOVERY);
301301
}
302302
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
303303
ds->disabledNodeMask |=msg.disabledNodeMask;
@@ -344,7 +344,7 @@ static void MtmOpenConnections()
344344
elog(WARNING,"Node is out of quorum: only %d nodes from %d are accssible",ds->nNodes,MtmNodes);
345345
ds->status=MTM_OFFLINE;
346346
}elseif (ds->status==MTM_INITIALIZATION) {
347-
MtmClusterSwitchMode(MTM_CONNECTED);
347+
MtmSwitchClusterMode(MTM_CONNECTED);
348348
}
349349
}
350350

@@ -745,7 +745,7 @@ static void MtmTransReceiver(Datum arg)
745745
}
746746
if (n==0&&ds->disabledNodeMask!=0) {
747747
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
748-
MtmUpdateClusterStatus();
748+
MtmRefreshClusterStatus(false);
749749
}
750750
}
751751
}

‎multimaster.c

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ static TransactionManager MtmTM = {
131131
MtmGetName
132132
};
133133

134-
staticcharconst*constMtmNodeStatusMnem[]=
134+
charconst*constMtmNodeStatusMnem[]=
135135
{
136136
"Intialization",
137137
"Offline",
@@ -602,7 +602,7 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
602602
x->xid=GetCurrentTransactionId();
603603

604604
if (dtm->disabledNodeMask!=0) {
605-
MtmUpdateClusterStatus();
605+
MtmRefreshClusterStatus(true);
606606
if (dtm->status!=MTM_ONLINE) {
607607
elog(ERROR,"Abort current transaction because this cluster node is not online");
608608
}
@@ -1096,7 +1096,7 @@ _PG_fini(void)
10961096
*/
10971097

10981098

1099-
voidMtmClusterSwitchMode(MtmNodeStatusmode)
1099+
voidMtmSwitchClusterMode(MtmNodeStatusmode)
11001100
{
11011101
dtm->status=mode;
11021102
elog(WARNING,"Switch to %s mode",MtmNodeStatusMnem[mode]);
@@ -1121,7 +1121,8 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
11211121
Assert(dtm->status==MTM_RECOVERY);
11221122
}elseif (dtm->status==MTM_RECOVERY) {
11231123
/* When recovery is completed we get normal transaction ID and switch to normal mode */
1124-
MtmClusterSwitchMode(MTM_ONLINE);
1124+
dtm->recoverySlot=0;
1125+
MtmSwitchClusterMode(MTM_ONLINE);
11251126
}
11261127
dtmTx.gtid=*gtid;
11271128
dtmTx.xid=GetCurrentTransactionId();
@@ -1137,9 +1138,8 @@ void MtmReceiverStarted(int nodeId)
11371138
if (!BIT_CHECK(dtm->pglogicalNodeMask,nodeId-1)) {
11381139
BIT_SET(dtm->pglogicalNodeMask,nodeId-1);
11391140
if (++dtm->nReceivers==dtm->nNodes-1) {
1140-
elog(WARNING,"All receivers are started, switch to normal mode");
11411141
Assert(dtm->status==MTM_CONNECTED);
1142-
dtm->status=MTM_ONLINE;
1142+
MtmSwitchClusterMode(MTM_OFFLINE);
11431143
}
11441144
}
11451145
SpinLockRelease(&dtm->spinlock);
@@ -1632,14 +1632,14 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16321632

16331633
ByteBufferAlloc(&buf);
16341634
EnumerateLocks(MtmSerializeLock,&buf);
1635-
PaxosSet(psprintf("lock-graph-%d",MtmNodeId),buf.data,buf.used);
1635+
PaxosSet(psprintf("lock-graph-%d",MtmNodeId),buf.data,buf.used, true);
16361636
MtmGraphInit(&graph);
16371637
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data,buf.used/sizeof(GlobalTransactionId));
16381638
ByteBufferFree(&buf);
16391639
for (i=0;i<MtmNodes;i++) {
16401640
if (i+1!=MtmNodeId&& !BIT_CHECK(dtm->disabledNodeMask,i)) {
16411641
intsize;
1642-
void*data=PaxosGet(psprintf("lock-graph-%d",i+1),&size,NULL);
1642+
void*data=PaxosGet(psprintf("lock-graph-%d",i+1),&size,NULL, true);
16431643
if (data==NULL) {
16441644
hasDeadlock= true;/* Just temporary hack until no Paxos */
16451645
}else {
@@ -1655,12 +1655,12 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16551655
}
16561656

16571657
staticvoid
1658-
MtmBuildConnectivityMatrix(nodemask_t*matrix)
1658+
MtmBuildConnectivityMatrix(nodemask_t*matrix,boolnowait)
16591659
{
16601660
inti,j,n=MtmNodes;
16611661
for (i=0;i<n;i++) {
16621662
if (i+1!=MtmNodeId) {
1663-
void*data=PaxosGet(psprintf("node-mask-%d",i+1),NULL,NULL);
1663+
void*data=PaxosGet(psprintf("node-mask-%d",i+1),NULL,NULL,nowait);
16641664
matrix[i]=*(nodemask_t*)data;
16651665
}else {
16661666
matrix[i]=dtm->connectivityMask;
@@ -1678,14 +1678,14 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
16781678
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
16791679
* This function returns false if current node is excluded from cluster, true otherwise
16801680
*/
1681-
voidMtmUpdateClusterStatus(void)
1681+
voidMtmRefreshClusterStatus(boolnowait)
16821682
{
16831683
nodemask_tmask,clique;
16841684
nodemask_tmatrix[MAX_NODES];
16851685
intclique_size;
16861686
inti;
16871687

1688-
MtmBuildConnectivityMatrix(matrix);
1688+
MtmBuildConnectivityMatrix(matrix,nowait);
16891689

16901690
clique=MtmFindMaxClique(matrix,MtmNodes,&clique_size);
16911691
if (clique_size >=MtmNodes/2+1) {/* have quorum */
@@ -1708,11 +1708,11 @@ void MtmUpdateClusterStatus(void)
17081708
if (BIT_CHECK(dtm->disabledNodeMask,MtmNodeId-1)) {
17091709
if (dtm->status==MTM_ONLINE) {
17101710
/* I was excluded from cluster:( */
1711-
MtmClusterSwitchMode(MTM_OFFLINE);
1711+
MtmSwitchClusterMode(MTM_OFFLINE);
17121712
}
17131713
}elseif (dtm->status==MTM_OFFLINE) {
17141714
/* Should we somehow restart logical receivers? */
1715-
MtmClusterSwitchMode(MTM_RECOVERY);
1715+
MtmSwitchClusterMode(MTM_RECOVERY);
17161716
}
17171717
}else {
17181718
elog(WARNING,"Clique %lx has no quorum",clique);
@@ -1722,30 +1722,30 @@ void MtmUpdateClusterStatus(void)
17221722
voidMtmOnNodeDisconnect(intnodeId)
17231723
{
17241724
BIT_SET(dtm->connectivityMask,nodeId-1);
1725-
PaxosSet(psprintf("node-mask-%d",MtmNodeId),&dtm->connectivityMask,sizeofdtm->connectivityMask);
1725+
PaxosSet(psprintf("node-mask-%d",MtmNodeId),&dtm->connectivityMask,sizeofdtm->connectivityMask, false);
17261726

17271727
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
17281728
MtmSleep(MtmKeepaliveTimeout);
17291729

1730-
MtmUpdateClusterStatus();
1730+
MtmRefreshClusterStatus(false);
17311731
}
17321732

17331733
voidMtmOnNodeConnect(intnodeId)
17341734
{
17351735
BIT_CLEAR(dtm->connectivityMask,nodeId-1);
1736-
PaxosSet(psprintf("node-mask-%d",MtmNodeId),&dtm->connectivityMask,sizeofdtm->connectivityMask);
1736+
PaxosSet(psprintf("node-mask-%d",MtmNodeId),&dtm->connectivityMask,sizeofdtm->connectivityMask, false);
17371737
}
17381738

17391739
/*
17401740
* Paxos function stubs (until them are miplemented)
17411741
*/
1742-
void*PaxosGet(charconst*key,int*size,PaxosTimestamp*ts)
1742+
void*PaxosGet(charconst*key,int*size,PaxosTimestamp*ts,boolnowait)
17431743
{
17441744
if (size!=NULL) {
17451745
*size=0;
17461746
}
17471747
returnNULL;
17481748
}
17491749

1750-
voidPaxosSet(charconst*key,voidconst*value,intsize)
1750+
voidPaxosSet(charconst*key,voidconst*value,intsize,boolnowait)
17511751
{}

‎multimaster.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ typedef struct
118118

119119
#defineMtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
120120

121+
externcharconst*constMtmNodeStatusMnem[];
122+
121123
externchar*MtmConnStrs;
122124
externintMtmNodeId;
123125
externintMtmNodes;
@@ -150,6 +152,6 @@ extern MtmState* MtmGetState(void);
150152
externtimestamp_tMtmGetCurrentTime(void);
151153
externvoidMtmSleep(timestamp_tinterval);
152154
externboolMtmIsRecoveredNode(intnodeId);
153-
externvoidMtmUpdateClusterStatus(void);
154-
externvoidMtmClusterSwitchMode(MtmNodeStatusmode);
155+
externvoidMtmRefreshClusterStatus(boolnowait);
156+
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
155157
#endif

‎paxos.h

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,26 @@ typedef struct PaxosTimestamp {
77
uint32psn;/* PAXOS serial number */
88
}PaxosTimestamp;
99

10-
externvoid*PaxosGet(charconst*key,int*size,PaxosTimestamp*ts);
11-
externvoidPaxosSet(charconst*key,voidconst*value,intsize);
10+
/*
11+
* Get key value.
12+
* Returns NULL if key doesn't exist.
13+
* Value should be copied into caller's private memory using palloc.
14+
* If "size" is not NULL, then it is assigned size of value.
15+
* If "ts" is not NULL, then it is assigned timestamp of last update of this value
16+
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
17+
*/
18+
externvoid*PaxosGet(charconst*key,int*size,PaxosTimestamp*ts,boolnowait);
1219

20+
/*
21+
* Set new value for the specified key. IF value is NULL, then key should be deleted.
22+
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
23+
*/
24+
externvoidPaxosSet(charconst*key,voidconst*value,intsize,boolnowait);
25+
26+
/*
27+
* If key doesn't exists or its value is not equal to the specified value then store this value and return true.
28+
* Otherwise do nothing and return false.
29+
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
30+
*/
31+
externboolPaxosCAS(charconst*key,charconst*value,boolnowait);
1332
#endif

‎pglogical_receiver.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,12 @@ pglogical_receiver_main(Datum main_arg)
332332
if (rc&WL_POSTMASTER_DEATH)
333333
proc_exit(1);
334334

335+
if (ds->status!=MTM_ONLINE&& (ds->status!=MTM_RECOVERY||ds->recoverySlot!=args->remote_node)) {
336+
ereport(LOG, (errmsg("%s: terminating WAL receiver because node is switched to %s mode",worker_proc,MtmNodeStatusMnem[ds->status])));
337+
proc_exit(0);
338+
}
339+
340+
335341
/* Some cleanup */
336342
if (copybuf!=NULL)
337343
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp