Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb6e113a

Browse files
knizhnikkelvich
authored andcommitted
Avoid loops in transaction list
1 parent1a120b5 commitb6e113a

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

‎multimaster.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ void MtmLock(LWLockMode mode)
242242
#else
243243
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID],mode);
244244
#endif
245+
Mtm->lastLockHolder=MyProcPid;
245246
}
246247

247248
voidMtmUnlock(void)
@@ -251,6 +252,7 @@ void MtmUnlock(void)
251252
#else
252253
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
253254
#endif
255+
Mtm->lastLockHolder=0;
254256
}
255257

256258
voidMtmLockNode(intnodeId)
@@ -549,16 +551,20 @@ MtmAdjustOldestXid(TransactionId xid)
549551

550552
staticvoidMtmTransactionListAppend(MtmTransState*ts)
551553
{
552-
ts->next=NULL;
553-
ts->nSubxids=0;
554-
*Mtm->transListTail=ts;
555-
Mtm->transListTail=&ts->next;
554+
if (!ts->isEnqueued) {
555+
ts->isEnqueued= true;
556+
ts->next=NULL;
557+
ts->nSubxids=0;
558+
*Mtm->transListTail=ts;
559+
Mtm->transListTail=&ts->next;
560+
}
556561
}
557562

558563
staticvoidMtmTransactionListInsertAfter(MtmTransState*after,MtmTransState*ts)
559564
{
560565
ts->next=after->next;
561566
after->next=ts;
567+
ts->isEnqueued= true;
562568
if (Mtm->transListTail==&after->next) {
563569
Mtm->transListTail=&ts->next;
564570
}
@@ -699,6 +705,9 @@ MtmCreateTransState(MtmCurrentTrans* x)
699705
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
700706
ts->snapshot=x->snapshot;
701707
ts->isLocal= true;
708+
if (!found) {
709+
ts->isEnqueued= false;
710+
}
702711
if (TransactionIdIsValid(x->gtid.xid)) {
703712
Assert(x->gtid.node!=MtmNodeId);
704713
ts->gtid=x->gtid;
@@ -832,6 +841,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
832841
Assert(x->gid[0]);
833842
tm->state=ts;
834843
ts->votingCompleted= true;
844+
if (!found) {
845+
ts->isEnqueued= false;
846+
}
835847
if (Mtm->status!=MTM_RECOVERY) {
836848
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
837849
if (!MtmUseDtm) {
@@ -944,8 +956,12 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
944956
*/
945957
MTM_LOG1("%d: send ABORT notification abort transaction %d to coordinator %d",MyProcPid,x->gtid.xid,x->gtid.node);
946958
if (ts==NULL) {
959+
boolfound;
947960
Assert(TransactionIdIsValid(x->xid));
948-
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
961+
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
962+
if (!found) {
963+
ts->isEnqueued= false;
964+
}
949965
ts->status=TRANSACTION_STATUS_ABORTED;
950966
ts->isLocal= true;
951967
ts->snapshot=x->snapshot;
@@ -1363,7 +1379,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
13631379
*/
13641380
boolMtmRefreshClusterStatus(boolnowait)
13651381
{
1366-
nodemask_tmask,clique,disabled,enabled;
1382+
nodemask_tmask,clique,disabled;
13671383
nodemask_tmatrix[MAX_NODES];
13681384
MtmTransState*ts;
13691385
intclique_size;
@@ -1390,28 +1406,29 @@ bool MtmRefreshClusterStatus(bool nowait)
13901406
MTM_LOG1("Find clique %lx, disabledNodeMask %lx", (long)clique, (long)Mtm->disabledNodeMask);
13911407
MtmLock(LW_EXCLUSIVE);
13921408
disabled= ~clique& (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask;/* new disabled nodes mask */
1393-
enabled=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
13941409

13951410
for (i=0,mask=disabled;mask!=0;i++,mask >>=1) {
13961411
if (mask&1) {
13971412
MtmDisableNode(i+1);
13981413
}
1399-
}
1400-
1414+
}
1415+
#if0/* Do not enable nodes here: them will be enabled after completion of recovery */
1416+
enabled=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
14011417
for (i=0,mask=enabled;mask!=0;i++,mask >>=1) {
14021418
if (mask&1) {
14031419
MtmEnableNode(i+1);
14041420
}
14051421
}
1406-
if (disabled|enabled) {
1422+
#endif
1423+
if (disabled) {
14071424
MtmCheckQuorum();
14081425
}
14091426
/* Interrupt voting for active transaction and abort them */
14101427
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
14111428
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
14121429
ts->gid,ts->gtid.node,ts->xid,ts->status,ts->gtid.xid);
14131430
if (MtmIsCoordinator(ts)) {
1414-
if (!ts->votingCompleted&&(disabled|enabled)!=0&&ts->status!=TRANSACTION_STATUS_ABORTED) {
1431+
if (!ts->votingCompleted&&disabled!=0&&ts->status!=TRANSACTION_STATUS_ABORTED) {
14151432
MtmAbortTransaction(ts);
14161433
MtmWakeUpBackend(ts);
14171434
}
@@ -2221,6 +2238,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
22212238
{
22222239
if (nodeId <=0||nodeId>Mtm->nLiveNodes)
22232240
{
2241+
MtmUnlock();
22242242
elog(ERROR,"NodeID %d is out of range [1,%d]",nodeId,Mtm->nLiveNodes);
22252243
}
22262244
MtmDisableNode(nodeId);
@@ -2286,6 +2304,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
22862304
MtmEnableNode(MtmReplicationNodeId);
22872305
MtmCheckQuorum();
22882306
}else {
2307+
MtmUnlock();
22892308
elog(ERROR,"Disabled node %d tries to reconnect without recovery",MtmReplicationNodeId);
22902309
}
22912310
}else {

‎multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ typedef struct MtmTransState
163163
structMtmTransState*next;/* Next element in L1 list of all finished transaction present in xid2state hash */
164164
boolvotingCompleted;/* 2PC voting is completed */
165165
boolisLocal;/* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
166-
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */
166+
boolisEnqueued;/* Transaction is inserted in queue */
167+
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */
167168
}MtmTransState;
168169

169170
typedefstruct
@@ -180,7 +181,7 @@ typedef struct
180181
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
181182
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
182183
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
183-
184+
intlastLockHolder;/* PID of process last obtaning the node lock */
184185
boollocalTablesHashLoaded;/* Whether data from local_tables table is loaded in shared memory hash table */
185186
intinject2PCError;/* Simulate error during 2PC commit at this node */
186187
intnLiveNodes;/* Number of active nodes */

‎tests2/lib/bank_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ def exec_tx(self, name, tx_block):
137137
self.history.register_finish(event_id,'Commit')
138138
exceptpsycopg2.InterfaceError:
139139
self.history.register_finish(event_id,'InterfaceError')
140-
exceptpsycopg2.Error:
140+
exceptpsycopg2.Errorasx:
141+
print(x.pgerror)
141142
self.history.register_finish(event_id,'PsycopgError')
142143

143144
cur.close()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp