Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit06a9513

Browse files
knizhnikkelvich
authored andcommitted
Introduce COMMITTED state
1 parented3caee commit06a9513

File tree

4 files changed

+29
-36
lines changed

4 files changed

+29
-36
lines changed

‎arbiter.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
243243
if (rc==1) {
244244
while ((rc=send(sd,src,size,0))<0&&errno==EINTR);
245245
if (rc<0) {
246+
if (errno==EINPROGRESS) {
247+
continue;
248+
}
246249
return false;
247250
}
248251
size-=rc;
@@ -258,7 +261,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
258261
{
259262
intrc;
260263
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
261-
if (rc <=0&&errno==EAGAIN) {
264+
if (rc <=0&&(errno==EAGAIN||errno==EINPROGRESS)) {
262265
rc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
263266
if (rc==1) {
264267
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
@@ -328,6 +331,7 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
328331
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)
329332
&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)
330333
&&Mtm->status!=MTM_RECOVERY
334+
&&Mtm->status!=MTM_RECOVERED
331335
&&Mtm->nodes[MtmNodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)<MtmGetSystemTime())
332336
{
333337
elog(WARNING,"Node %d thinks that I am dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],MtmMessageKindMnem[resp->code]);

‎multimaster.c

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ char const* const MtmNodeStatusMnem[] =
201201
"Connected",
202202
"Online",
203203
"Recovery",
204+
"Recovered",
204205
"InMinor",
205206
"OutOfService"
206207
};
@@ -831,7 +832,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
831832
* Allow applying of replicated transactions to avoid deadlock (to caught-up we need active transaction counter to become zero).
832833
* Also allow user to complete explicit 2PC transactions.
833834
*/
834-
if (x->isDistributed&& !x->isReplicated&&x->isTwoPhase&&strcmp(application_name,MULTIMASTER_ADMIN)!=0) {
835+
if (x->isDistributed&& !x->isReplicated&&!x->isTwoPhase&&strcmp(application_name,MULTIMASTER_ADMIN)!=0) {
835836
MtmCheckClusterLock();
836837
}
837838

@@ -1872,7 +1873,7 @@ void MtmRecoveryCompleted(void)
18721873
BIT_SET(Mtm->nodeLockerMask,MtmNodeId-1);/* it is trik: this mask was originally use by WAL senders performing recovery, but here we are in opposite (recovered) side:
18731874
* if this mask is not zero loadReq will be broadcasted to all other nodes by heartbeat, suspending their activity
18741875
*/
1875-
MtmSwitchClusterMode(MTM_CONNECTED);
1876+
MtmSwitchClusterMode(MTM_RECOVERED);
18761877
}
18771878
MtmUnlock();
18781879
}
@@ -2030,35 +2031,17 @@ MtmCheckClusterLock()
20302031
timestamp_tdelay=MIN_WAIT_TIMEOUT;
20312032
while (true)
20322033
{
2033-
nodemask_tmask=Mtm->walSenderLockerMask;
2034-
if (Mtm->globalLockerMask |mask) {
2035-
if (Mtm->nActiveTransactions==0) {
2036-
lsn_tcurrLogPos=GetXLogInsertRecPtr();
2037-
inti;
2038-
for (i=0;mask!=0;i++,mask >>=1) {
2039-
if (mask&1) {
2040-
if (WalSndCtl->walsnds[i].sentPtr!=currLogPos) {
2041-
/* recovery is in progress */
2042-
break;
2043-
}else {
2044-
/* recovered replica caught up with master */
2045-
MTM_LOG1("WAL-sender %d complete recovery",i);
2046-
BIT_CLEAR(Mtm->walSenderLockerMask,i);
2047-
}
2048-
}
2049-
}
2034+
if (Mtm->globalLockerMask |Mtm->walSenderLockerMask) {
2035+
/* some "almost cautch-up" wal-senders are still working. */
2036+
/* Do not start new transactions until them are completed. */
2037+
MtmUnlock();
2038+
MtmSleep(delay);
2039+
if (delay*2 <=MAX_WAIT_TIMEOUT) {
2040+
delay *=2;
20502041
}
2051-
if (Mtm->globalLockerMask |mask) {
2052-
/* some "almost cautch-up" wal-senders are still working. */
2053-
/* Do not start new transactions until them are completed. */
2054-
MtmUnlock();
2055-
MtmSleep(delay);
2056-
if (delay*2 <=MAX_WAIT_TIMEOUT) {
2057-
delay *=2;
2058-
}
2059-
MtmLock(LW_EXCLUSIVE);
2060-
continue;
2061-
}else {
2042+
MtmLock(LW_EXCLUSIVE);
2043+
}else {
2044+
if (Mtm->nodeLockerMask!=0) {
20622045
/* All lockers have synchronized their logs */
20632046
/* Remove lock and mark them as recovered */
20642047
MTM_LOG1("Complete recovery of %d nodes (node mask %llx)",Mtm->nLockers,Mtm->nodeLockerMask);
@@ -2071,8 +2054,8 @@ MtmCheckClusterLock()
20712054
Mtm->nodeLockerMask=0;
20722055
MtmCheckQuorum();
20732056
}
2057+
break;
20742058
}
2075-
break;
20762059
}
20772060
}
20782061

@@ -3230,7 +3213,9 @@ void MtmReceiverStarted(int nodeId)
32303213
MtmCheckQuorum();
32313214
}
32323215
elog(LOG,"Start %d receivers and %d senders from %d cluster status %s",Mtm->nReceivers+1,Mtm->nSenders,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
3233-
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3216+
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1
3217+
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
3218+
{
32343219
BIT_CLEAR(Mtm->nodeLockerMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
32353220
MtmSwitchClusterMode(MTM_ONLINE);
32363221
}
@@ -3332,7 +3317,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33323317
Mtm->preparedTransactionsLoaded= true;
33333318
}
33343319

3335-
while ((Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)||BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
3320+
while ((Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_RECOVERED&&Mtm->status!=MTM_ONLINE)
3321+
||BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
33363322
{
33373323
if (*shutdown)
33383324
{
@@ -3541,7 +3527,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35413527
if (!BIT_CHECK(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1)) {
35423528
elog(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
35433529
BIT_SET(Mtm->pglogicalSenderMask,MtmReplicationNodeId-1);
3544-
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3530+
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1
3531+
&& (Mtm->status==MTM_RECOVERED||Mtm->status==MTM_CONNECTED))
3532+
{
35453533
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
35463534
BIT_CLEAR(Mtm->nodeLockerMask,MtmNodeId-1);/* recovery is completed: release cluster lock */
35473535
MtmSwitchClusterMode(MTM_ONLINE);

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ typedef enum
133133
MTM_CONNECTED,/* Arbiter is established connections with other nodes */
134134
MTM_ONLINE,/* Ready to receive client's queries */
135135
MTM_RECOVERY,/* Node is in recovery process */
136+
MTM_RECOVERED,/* Node is recovered by is not yet switched to ONLINE because not all sender/receivers are restarted */
136137
MTM_IN_MINORITY,/* Node is out of quorum */
137138
MTM_OUT_OF_SERVICE/* Node is not avaiable to to critical, non-recoverable error */
138139
}MtmNodeStatus;

‎pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ pglogical_receiver_main(Datum main_arg)
305305
timestamp_tstart=MtmGetSystemTime();
306306
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
307307
res=PQexec(conn,query->data);
308-
elog(LOG,"Recreate replication slot %s: %ldmillisconds",slotName, (long)USEC_TO_MSEC(MtmGetSystemTime()-start));
308+
elog(LOG,"Drop replication slot %s: %ldmilliseconds",slotName, (long)USEC_TO_MSEC(MtmGetSystemTime()-start));
309309
PQclear(res);
310310
resetPQExpBuffer(query);
311311
timeline=Mtm->nodes[nodeId-1].timeline;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp