Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8b1a313

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parentsfe7790f +8ea606c commit8b1a313

File tree

3 files changed

+23
-13
lines changed

3 files changed

+23
-13
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ void MtmSleep(timestamp_t interval)
274274
{
275275
structtimespects;
276276
structtimespecrem;
277-
ts.tv_sec=interval/1000000;
278-
ts.tv_nsec=interval%1000000*1000;
277+
ts.tv_sec=interval/USECS_PER_SEC;
278+
ts.tv_nsec=interval%USECS_PER_SEC*1000;
279279

280280
while (nanosleep(&ts,&rem)<0) {
281281
Assert(errno==EINTR);
@@ -330,7 +330,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
330330

331331
MtmLock(LW_SHARED);
332332
ts=hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
333-
if (ts!=NULL) {
333+
if (ts!=NULL&& !ts->isLocal) {
334334
snapshot=ts->snapshot;
335335
}
336336
MtmUnlock();
@@ -452,8 +452,8 @@ MtmAdjustOldestXid(TransactionId xid)
452452

453453
MtmLock(LW_EXCLUSIVE);
454454
ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
455-
if (ts!=NULL&&ts->status==TRANSACTION_STATUS_COMMITTED) {
456-
csn_toldestSnapshot=ts->csn;
455+
if (ts!=NULL) {
456+
csn_toldestSnapshot=ts->snapshot;
457457
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
458458
for (i=0;i<Mtm->nAllNodes;i++) {
459459
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
@@ -483,8 +483,7 @@ MtmAdjustOldestXid(TransactionId xid)
483483
if (prev!=NULL) {
484484
Mtm->transListHead=prev;
485485
Mtm->oldestXid=xid=prev->xid;
486-
}else {
487-
Assert(TransactionIdPrecedesOrEquals(Mtm->oldestXid,xid));
486+
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
488487
xid=Mtm->oldestXid;
489488
}
490489
}else {
@@ -650,6 +649,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
650649
if (!found) {
651650
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
652651
ts->snapshot=x->snapshot;
652+
ts->isLocal= true;
653653
if (TransactionIdIsValid(x->gtid.xid)) {
654654
Assert(x->gtid.node!=MtmNodeId);
655655
ts->gtid=x->gtid;
@@ -704,7 +704,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
704704
/*
705705
* Invalid CSN prevent replication of transaction by logical replication
706706
*/
707-
ts->snapshot=x->isReplicated|| !x->containsDML ?INVALID_CSN :x->snapshot;
707+
ts->isLocal=x->isReplicated|| !x->containsDML;
708+
ts->snapshot=x->snapshot;
708709
ts->csn=MtmAssignCSN();
709710
ts->procno=MyProc->pgprocno;
710711
ts->nVotes=1;/* I am voted myself */
@@ -752,16 +753,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
752753
}else {
753754
time_ttimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
754755
intresult=0;
756+
intnConfigChanges=Mtm->nConfigChanges;
755757
/* wait votes from all nodes */
756758
while (!ts->votingCompleted&& !(result&WL_TIMEOUT)) {
757759
MtmUnlock();
758760
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,timeout);
759761
ResetLatch(&MyProc->procLatch);
760762
MtmLock(LW_SHARED);
761763
}
762-
if (!ts->votingCompleted) {
764+
if (!ts->votingCompleted) {
763765
ts->status=TRANSACTION_STATUS_ABORTED;
764-
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)((ts->csn-x->snapshot)/1000));
766+
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)USEC_TO_MSEC(ts->csn-x->snapshot));
767+
}elseif (nConfigChanges!=Mtm->nConfigChanges) {
768+
ts->status=TRANSACTION_STATUS_ABORTED;
769+
elog(WARNING,"Transaction is aborted because cluster configuration is changed during commit");
765770
}
766771
x->status=ts->status;
767772
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
@@ -830,7 +835,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
830835
Assert(TransactionIdIsValid(x->xid));
831836
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
832837
ts->status=TRANSACTION_STATUS_ABORTED;
833-
ts->snapshot=INVALID_CSN;
838+
ts->isLocal= true;
839+
ts->snapshot=x->snapshot;
834840
ts->csn=MtmAssignCSN();
835841
ts->gtid=x->gtid;
836842
ts->nSubxids=0;
@@ -1089,6 +1095,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10891095
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
10901096
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
10911097
Mtm->nLiveNodes+=1;
1098+
Mtm->nConfigChanges+=1;
10921099
caughtUp= true;
10931100
}elseif (!BIT_CHECK(Mtm->nodeLockerMask,nodeId-1)
10941101
&&slotLSN+MtmMinRecoveryLag>walLSN)
@@ -1263,6 +1270,7 @@ bool MtmRefreshClusterStatus(bool nowait)
12631270

12641271
voidMtmCheckQuorum(void)
12651272
{
1273+
Mtm->nConfigChanges+=1;
12661274
if (Mtm->nLiveNodes<Mtm->nAllNodes/2+1) {
12671275
if (Mtm->status==MTM_ONLINE) {/* out of quorum */
12681276
elog(WARNING,"Node is in minority: disabled mask %lx", (long)Mtm->disabledNodeMask);
@@ -1460,6 +1468,7 @@ static void MtmInitialize()
14601468
Mtm->nReceivers=0;
14611469
Mtm->timeShift=0;
14621470
Mtm->transCount=0;
1471+
Mtm->nConfigChanges=0;
14631472
Mtm->localTablesHashLoaded= false;
14641473
for (i=0;i<MtmNodes;i++) {
14651474
Mtm->nodes[i].oldestSnapshot=0;

‎contrib/mmts/multimaster.h‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ typedef struct MtmTransState
145145
structMtmTransState*nextVoting;/* Next element in L1-list of voting transactions. */
146146
structMtmTransState*next;/* Next element in L1 list of all finished transaction present in xid2state hash */
147147
boolvotingCompleted;/* 2PC voting is completed */
148+
boolisLocal;/* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
148149
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */
149150
}MtmTransState;
150151

@@ -169,7 +170,8 @@ typedef struct
169170
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
170171
intnLockers;/* Number of lockers */
171172
intnActiveTransactions;/* Nunmber of active 2PC transactions */
172-
longtimeShift;/* Local time correction */
173+
intnConfigChanges;/* Number of cluster configuration changes */
174+
int64timeShift;/* Local time correction */
173175
csn_tcsn;/* Last obtained CSN: used to provide unique acending CSNs based on system time */
174176
MtmTransState*votingTransactions;/* L1-list of replicated transactions sendings notifications to coordinator.
175177
This list is used to pass information to mtm-sender BGW */

‎contrib/mmts/tests/dtmacid.cpp‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ void* reader(void* arg)
144144
}
145145
}
146146
}
147-
t.transactions +=2;
148147
t.selects +=2;
149148
txn1.commit();
150149
txn2.commit();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp