Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit42c2152

Browse files
committed
mrg
2 parents366c7d1 +bc92057 commit42c2152

File tree

2 files changed

+33
-13
lines changed

2 files changed

+33
-13
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ void MtmLock(LWLockMode mode)
243243
#else
244244
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID],mode);
245245
#endif
246+
Mtm->lastLockHolder=MyProcPid;
246247
}
247248

248249
voidMtmUnlock(void)
@@ -252,6 +253,7 @@ void MtmUnlock(void)
252253
#else
253254
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
254255
#endif
256+
Mtm->lastLockHolder=0;
255257
}
256258

257259
voidMtmLockNode(intnodeId)
@@ -550,16 +552,20 @@ MtmAdjustOldestXid(TransactionId xid)
550552

551553
staticvoidMtmTransactionListAppend(MtmTransState*ts)
552554
{
553-
ts->next=NULL;
554-
ts->nSubxids=0;
555-
*Mtm->transListTail=ts;
556-
Mtm->transListTail=&ts->next;
555+
if (!ts->isEnqueued) {
556+
ts->isEnqueued= true;
557+
ts->next=NULL;
558+
ts->nSubxids=0;
559+
*Mtm->transListTail=ts;
560+
Mtm->transListTail=&ts->next;
561+
}
557562
}
558563

559564
staticvoidMtmTransactionListInsertAfter(MtmTransState*after,MtmTransState*ts)
560565
{
561566
ts->next=after->next;
562567
after->next=ts;
568+
ts->isEnqueued= true;
563569
if (Mtm->transListTail==&after->next) {
564570
Mtm->transListTail=&ts->next;
565571
}
@@ -700,6 +706,9 @@ MtmCreateTransState(MtmCurrentTrans* x)
700706
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
701707
ts->snapshot=x->snapshot;
702708
ts->isLocal= true;
709+
if (!found) {
710+
ts->isEnqueued= false;
711+
}
703712
if (TransactionIdIsValid(x->gtid.xid)) {
704713
Assert(x->gtid.node!=MtmNodeId);
705714
ts->gtid=x->gtid;
@@ -833,6 +842,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
833842
Assert(x->gid[0]);
834843
tm->state=ts;
835844
ts->votingCompleted= true;
845+
if (!found) {
846+
ts->isEnqueued= false;
847+
}
836848
if (Mtm->status!=MTM_RECOVERY) {
837849
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
838850
if (!MtmUseDtm) {
@@ -945,8 +957,12 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
945957
*/
946958
MTM_LOG1("%d: send ABORT notification abort transaction %d to coordinator %d",MyProcPid,x->gtid.xid,x->gtid.node);
947959
if (ts==NULL) {
960+
boolfound;
948961
Assert(TransactionIdIsValid(x->xid));
949-
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
962+
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
963+
if (!found) {
964+
ts->isEnqueued= false;
965+
}
950966
ts->status=TRANSACTION_STATUS_ABORTED;
951967
ts->isLocal= true;
952968
ts->snapshot=x->snapshot;
@@ -1364,7 +1380,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
13641380
*/
13651381
boolMtmRefreshClusterStatus(boolnowait)
13661382
{
1367-
nodemask_tmask,clique,disabled,enabled;
1383+
nodemask_tmask,clique,disabled;
13681384
nodemask_tmatrix[MAX_NODES];
13691385
MtmTransState*ts;
13701386
intclique_size;
@@ -1391,28 +1407,29 @@ bool MtmRefreshClusterStatus(bool nowait)
13911407
MTM_LOG1("Find clique %lx, disabledNodeMask %lx", (long)clique, (long)Mtm->disabledNodeMask);
13921408
MtmLock(LW_EXCLUSIVE);
13931409
disabled= ~clique& (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask;/* new disabled nodes mask */
1394-
enabled=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
13951410

13961411
for (i=0,mask=disabled;mask!=0;i++,mask >>=1) {
13971412
if (mask&1) {
13981413
MtmDisableNode(i+1);
13991414
}
1400-
}
1401-
1415+
}
1416+
#if0/* Do not enable nodes here: them will be enabled after completion of recovery */
1417+
enabled=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
14021418
for (i=0,mask=enabled;mask!=0;i++,mask >>=1) {
14031419
if (mask&1) {
14041420
MtmEnableNode(i+1);
14051421
}
14061422
}
1407-
if (disabled|enabled) {
1423+
#endif
1424+
if (disabled) {
14081425
MtmCheckQuorum();
14091426
}
14101427
/* Interrupt voting for active transaction and abort them */
14111428
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
14121429
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
14131430
ts->gid,ts->gtid.node,ts->xid,ts->status,ts->gtid.xid);
14141431
if (MtmIsCoordinator(ts)) {
1415-
if (!ts->votingCompleted&&(disabled|enabled)!=0&&ts->status!=TRANSACTION_STATUS_ABORTED) {
1432+
if (!ts->votingCompleted&&disabled!=0&&ts->status!=TRANSACTION_STATUS_ABORTED) {
14161433
MtmAbortTransaction(ts);
14171434
MtmWakeUpBackend(ts);
14181435
}
@@ -2222,6 +2239,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
22222239
{
22232240
if (nodeId <=0||nodeId>Mtm->nLiveNodes)
22242241
{
2242+
MtmUnlock();
22252243
elog(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nLiveNodes);
22262244
}
22272245
MtmDisableNode(nodeId);
@@ -2287,6 +2305,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
22872305
MtmEnableNode(MtmReplicationNodeId);
22882306
MtmCheckQuorum();
22892307
}else {
2308+
MtmUnlock();
22902309
elog(ERROR,"Disabled node %d tries to reconnect without recovery",MtmReplicationNodeId);
22912310
}
22922311
}else {

‎contrib/mmts/multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ typedef struct MtmTransState
163163
structMtmTransState*next;/* Next element in L1 list of all finished transaction present in xid2state hash */
164164
boolvotingCompleted;/* 2PC voting is completed */
165165
boolisLocal;/* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
166-
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */
166+
boolisEnqueued;/* Transaction is inserted in queue */
167+
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */
167168
}MtmTransState;
168169

169170
typedefstruct
@@ -180,7 +181,7 @@ typedef struct
180181
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
181182
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
182183
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
183-
184+
intlastLockHolder;/* PID of process last obtaning the node lock */
184185
boollocalTablesHashLoaded;/* Whether data from local_tables table is loaded in shared memory hash table */
185186
intinject2PCError;/* Simulate error during 2PC commit at this node */
186187
intnLiveNodes;/* Number of active nodes */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp