Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit50a052c

Browse files
knizhnikkelvich
authored andcommitted
Recvoery bug fixing
1 parent7795f88 commit50a052c

File tree

4 files changed

+22
-11
lines changed

4 files changed

+22
-11
lines changed

‎arbiter.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ static void MtmOpenConnections()
386386
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
387387
{
388388
while (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
389-
elog(WARNING,"Arbiter failed to writesocket: %d",errno);
389+
elog(WARNING,"Arbiter failed to writeto node %d: %d",node+1,errno);
390390
if (sockets[node] >=0) {
391391
close(sockets[node]);
392392
}
@@ -395,6 +395,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
395395
MtmOnNodeDisconnect(node+1);
396396
return false;
397397
}
398+
elog(NOTICE,"Arbiter restablish connection with node %d",node+1);
398399
}
399400
return true;
400401
}

‎multimaster.c

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
672672

673673
if (Mtm->disabledNodeMask!=0) {
674674
MtmRefreshClusterStatus(true);
675-
if (Mtm->status!=MTM_ONLINE) {
675+
if (!IsBackgroundWorker&&Mtm->status!=MTM_ONLINE) {
676676
elog(ERROR,"Abort current transaction because this cluster node is not online");
677677
}
678678
}
@@ -682,7 +682,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
682682
/*
683683
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
684684
*/
685-
MtmCheckClusterLock();
685+
if (!x->isReplicated) {
686+
MtmCheckClusterLock();
687+
}
686688

687689
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
688690
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
@@ -994,21 +996,23 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
994996
if (MtmIsRecoveredNode(nodeId)) {
995997
XLogRecPtrwalLSN=GetXLogInsertRecPtr();
996998
MtmLock(LW_EXCLUSIVE);
999+
#if0
9971000
if (slotLSN==walLSN) {
9981001
if (BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)) {
9991002
elog(WARNING,"Node %d is caught-up",nodeId);
1000-
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
10011003
BIT_CLEAR(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
10021004
BIT_CLEAR(Mtm->nodeLockerMask,nodeId-1);
10031005
Mtm->nLockers-=1;
10041006
}else {
10051007
elog(WARNING,"Node %d is caugth-up without locking cluster",nodeId);
10061008
/* We are lucky: caugth-up without locking cluster! */
1007-
Mtm->nNodes+=1;
1008-
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
10091009
}
1010+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1011+
Mtm->nNodes+=1;
10101012
caughtUp= true;
1011-
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
1013+
}else
1014+
#endif
1015+
if (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
10121016
&&slotLSN+MtmMinRecoveryLag>walLSN)
10131017
{
10141018
/*
@@ -2248,7 +2252,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22482252

22492253
voidMtmExecute(void*work,intsize)
22502254
{
2251-
BgwPoolExecute(&Mtm->pool,work,size);
2255+
if (Mtm->status==MTM_RECOVERY) {
2256+
/* During recovery apply changes sequentially to preserve commit order */
2257+
MtmExecutor(0,work,size);
2258+
}else {
2259+
BgwPoolExecute(&Mtm->pool,work,size);
2260+
}
22522261
}
22532262

22542263
staticBgwPool*

‎pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,12 @@ process_remote_commit(StringInfo in)
497497
uint8flags;
498498
csn_tcsn;
499499
constchar*gid=NULL;
500-
boolcaughtUp;
500+
boolcaughtUp= false;
501501

502502
/* read flags */
503503
flags=pq_getmsgbyte(in);
504504
MtmReplicationNode=pq_getmsgbyte(in);
505-
caughtUp=pq_getmsgbyte(in)!=0;
505+
/*caughtUp = pq_getmsgbyte(in) != 0;*/
506506

507507
/* read fields */
508508
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */

‎pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
147147
return;
148148
}
149149
}
150+
MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn);
150151

151152
pq_sendbyte(out,'C');/* sending COMMIT */
152153

@@ -155,7 +156,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
155156
/* send the flags field */
156157
pq_sendbyte(out,flags);
157158
pq_sendbyte(out,MtmNodeId);
158-
pq_sendbyte(out,MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn));
159+
/*pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));*/
159160

160161
/* send fixed fields */
161162
pq_sendint64(out,commit_lsn);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp