Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit75e4922

Browse files
knizhnikkelvich
authored andcommitted
2PC bug fixes
1 parent0d9ed50 commit75e4922

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

‎arbiter.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
460460
buf->data[buf->used].code=ts->status==TRANSACTION_STATUS_ABORTED ?MSG_ABORTED :MSG_PREPARED;
461461
buf->data[buf->used].dxid=xid;
462462
buf->data[buf->used].sxid=ts->xid;
463-
buf->data[buf->used].csn=ts->csn;
463+
buf->data[buf->used].csn=ts->csn;
464464
buf->data[buf->used].node=MtmNodeId;
465465
buf->data[buf->used].disabledNodeMask=ds->disabledNodeMask;
466466
buf->used+=1;
@@ -495,6 +495,7 @@ static void MtmTransSender(Datum arg)
495495
MtmAppendBuffer(txBuffer,ts->gtid.xid,ts->gtid.node-1,ts);
496496
}
497497
ds->votingTransactions=NULL;
498+
498499
MtmUnlock();
499500

500501
for (i=0;i<nNodes;i++) {
@@ -616,7 +617,8 @@ static void MtmTransReceiver(Datum arg)
616617
Assert (MtmIsCoordinator(ts));
617618
switch (msg->code) {
618619
caseMSG_PREPARED:
619-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
620+
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
621+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS||ts->status==TRANSACTION_STATUS_UNKNOWN);
620622
if (msg->csn>ts->csn) {
621623
ts->csn=msg->csn;
622624
MtmSyncClock(ts->csn);
@@ -627,7 +629,8 @@ static void MtmTransReceiver(Datum arg)
627629
}
628630
break;
629631
caseMSG_ABORTED:
630-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
632+
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
633+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS||ts->status==TRANSACTION_STATUS_UNKNOWN);
631634
ts->status=TRANSACTION_STATUS_ABORTED;
632635
MtmAdjustSubtransactions(ts);
633636
MtmWakeUpBackend(ts);

‎multimaster.c

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
571571
MtmCheckClusterLock();
572572

573573
ts=hash_search(xid2state,&x->xid,HASH_ENTER,NULL);
574-
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
574+
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
575+
/*
576+
* Invalid CSN prevent replication of transaction by logical replication
577+
*/
575578
ts->snapshot=x->isReplicated|| !x->containsDML ?INVALID_CSN :x->snapshot;
576579
ts->csn=MtmAssignCSN();
577580
ts->gtid=x->gtid;
@@ -603,7 +606,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
603606
MtmTransState*ts;
604607

605608
MtmLock(LW_EXCLUSIVE);
606-
ts=hash_search(xid2state,&x->xid,HASH_ENTER,NULL);
609+
ts=hash_search(xid2state,&x->xid,HASH_FIND,NULL);
607610
Assert(ts!=NULL);
608611
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
609612
ts->status=TRANSACTION_STATUS_UNKNOWN;
@@ -619,7 +622,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
619622
}else {
620623
/* wait N commits or just one ABORT */
621624
ts->nVotes+=1;/* I vote myself */
622-
while (ts->nVotes!=dtm->nNodes&&ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
625+
while (ts->nVotes!=dtm->nNodes&&ts->status!=TRANSACTION_STATUS_ABORTED) {
623626
MtmUnlock();
624627
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
625628
ResetLatch(&MyProc->procLatch);
@@ -628,6 +631,8 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
628631
MtmUnlock();
629632
if (ts->status==TRANSACTION_STATUS_ABORTED) {
630633
elog(ERROR,"Distributed transaction %d is rejected by DTM",x->xid);
634+
}else {
635+
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN);
631636
}
632637
}
633638
}
@@ -637,7 +642,7 @@ static void
637642
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
638643
{
639644
MTM_TRACE("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n",MyProcPid,x->xid,x->isPrepared,x->isDistributed,commit ?"commit" :"abort");
640-
if (x->isDistributed&& (TransactionIdIsValid(x->xid)||x->isReplicated)) {
645+
if (x->isDistributed&& (x->isPrepared||x->isReplicated)) {
641646
MtmTransState*ts;
642647
MtmLock(LW_EXCLUSIVE);
643648
if (x->isPrepared) {
@@ -656,7 +661,11 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
656661
}
657662
}else {
658663
ts->status=TRANSACTION_STATUS_ABORTED;
659-
if (x->isReplicated) {
664+
if (x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
665+
/*
666+
* Send notification only of ABORT happens during transaction processing at replicas,
667+
* do not send notification if ABORT is receiver from master
668+
*/
660669
MtmSendNotificationMessage(ts);/* send notification to coordinator */
661670
}
662671
}
@@ -683,8 +692,6 @@ void MtmSendNotificationMessage(MtmTransState* ts)
683692
}
684693
}
685694

686-
687-
688695
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
689696
{
690697
csn_tlocalSnapshot;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp