Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4147df0

Browse files
committed
Recvoery bug fixing
1 parent2956510 commit4147df0

File tree

5 files changed

+26
-12
lines changed

5 files changed

+26
-12
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ static void MtmOpenConnections()
386386
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
387387
{
388388
while (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
389-
elog(WARNING,"Arbiter failed to writesocket: %d",errno);
389+
elog(WARNING,"Arbiter failed to writeto node %d: %d",node+1,errno);
390390
if (sockets[node] >=0) {
391391
close(sockets[node]);
392392
}
@@ -395,6 +395,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
395395
MtmOnNodeDisconnect(node+1);
396396
return false;
397397
}
398+
elog(NOTICE,"Arbiter restablish connection with node %d",node+1);
398399
}
399400
return true;
400401
}

‎contrib/mmts/multimaster.c‎

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
673673

674674
if (Mtm->disabledNodeMask!=0) {
675675
MtmRefreshClusterStatus(true);
676-
if (Mtm->status!=MTM_ONLINE) {
676+
if (!IsBackgroundWorker&&Mtm->status!=MTM_ONLINE) {
677677
elog(ERROR,"Abort current transaction because this cluster node is not online");
678678
}
679679
}
@@ -683,7 +683,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
683683
/*
684684
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
685685
*/
686-
MtmCheckClusterLock();
686+
if (!x->isReplicated) {
687+
MtmCheckClusterLock();
688+
}
687689

688690
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
689691
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
@@ -995,21 +997,23 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
995997
if (MtmIsRecoveredNode(nodeId)) {
996998
XLogRecPtrwalLSN=GetXLogInsertRecPtr();
997999
MtmLock(LW_EXCLUSIVE);
1000+
#if0
9981001
if (slotLSN==walLSN) {
9991002
if (BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)) {
10001003
elog(WARNING,"Node %d is caught-up",nodeId);
1001-
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
10021004
BIT_CLEAR(Mtm->walSenderLockerMask,MyWalSnd-WalSndCtl->walsnds);
10031005
BIT_CLEAR(Mtm->nodeLockerMask,nodeId-1);
10041006
Mtm->nLockers-=1;
10051007
}else {
10061008
elog(WARNING,"Node %d is caugth-up without locking cluster",nodeId);
10071009
/* We are lucky: caugth-up without locking cluster! */
1008-
Mtm->nNodes+=1;
1009-
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
10101010
}
1011+
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
1012+
Mtm->nNodes+=1;
10111013
caughtUp= true;
1012-
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
1014+
}else
1015+
#endif
1016+
if (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
10131017
&&slotLSN+MtmMinRecoveryLag>walLSN)
10141018
{
10151019
/*
@@ -2250,7 +2254,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22502254

22512255
voidMtmExecute(void*work,intsize)
22522256
{
2253-
BgwPoolExecute(&Mtm->pool,work,size);
2257+
if (Mtm->status==MTM_RECOVERY) {
2258+
/* During recovery apply changes sequentially to preserve commit order */
2259+
MtmExecutor(0,work,size);
2260+
}else {
2261+
BgwPoolExecute(&Mtm->pool,work,size);
2262+
}
22542263
}
22552264

22562265
staticBgwPool*

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,12 +499,12 @@ process_remote_commit(StringInfo in)
499499
uint8flags;
500500
csn_tcsn;
501501
constchar*gid=NULL;
502-
boolcaughtUp;
502+
boolcaughtUp= false;
503503

504504
/* read flags */
505505
flags=pq_getmsgbyte(in);
506506
MtmReplicationNode=pq_getmsgbyte(in);
507-
caughtUp=pq_getmsgbyte(in)!=0;
507+
/*caughtUp = pq_getmsgbyte(in) != 0;*/
508508

509509
/* read fields */
510510
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
147147
return;
148148
}
149149
}
150+
MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn);
150151

151152
pq_sendbyte(out,'C');/* sending COMMIT */
152153

@@ -155,7 +156,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
155156
/* send the flags field */
156157
pq_sendbyte(out,flags);
157158
pq_sendbyte(out,MtmNodeId);
158-
pq_sendbyte(out,MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn));
159+
/*pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));*/
159160

160161
/* send fixed fields */
161162
pq_sendint64(out,commit_lsn);

‎src/backend/replication/logical/decode.c‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
488488
{
489489
XLogRecPtrorigin_lsn=InvalidXLogRecPtr;
490490
TimestampTzcommit_time=parsed->xact_time;
491-
XLogRecPtrorigin_id=XLogRecGetOrigin(buf->record);
491+
RepOriginIdorigin_id=XLogRecGetOrigin(buf->record);
492492
inti;
493493

494494
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
@@ -541,6 +541,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
541541
(parsed->dbId!=InvalidOid&&parsed->dbId!=ctx->slot->data.database)||
542542
FilterByOrigin(ctx,origin_id))
543543
{
544+
elog(WARNING,"%d: WAL-SENDER ignore record %lx with origin %d: SnapBuildXactNeedsSkip=%d, FilterByOrigin=%d",
545+
getpid(),buf->origptr,origin_id,
546+
SnapBuildXactNeedsSkip(ctx->snapshot_builder,buf->origptr),FilterByOrigin(ctx,origin_id));
544547
for (i=0;i<parsed->nsubxacts;i++)
545548
{
546549
ReorderBufferForget(ctx->reorder,parsed->subxacts[i],buf->origptr);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp