Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdc5ae4a

Browse files
knizhnikkelvich
authored andcommitted
Fixes in 2pc
1 parent5e35877 commitdc5ae4a

File tree

3 files changed

+44
-18
lines changed

3 files changed

+44
-18
lines changed

‎arbiter.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,7 @@ static void MtmTransReceiver(Datum arg)
677677
ts->status=TRANSACTION_STATUS_UNKNOWN;
678678
MtmWakeUpBackend(ts);
679679
}
680+
break;
680681
default:
681682
Assert(false);
682683
}

‎multimaster.c

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ static void MtmXactCallback(XactEvent event, void *arg);
110110
staticvoidMtmBeginTransaction(MtmCurrentTrans*x);
111111
staticvoidMtmPrePrepareTransaction(MtmCurrentTrans*x);
112112
staticvoidMtmPostPrepareTransaction(MtmCurrentTrans*x);
113+
staticvoidMtmAbortPreparedTransaction(MtmCurrentTrans*x);
113114
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
114115
staticTransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum);
115116
staticboolMtmXidInMVCCSnapshot(TransactionIdxid,Snapshotsnapshot);
@@ -496,6 +497,9 @@ MtmXactCallback(XactEvent event, void *arg)
496497
caseXACT_EVENT_POST_PREPARE:
497498
MtmPostPrepareTransaction(&dtmTx);
498499
break;
500+
caseXACT_EVENT_ABORT_PREPARED:
501+
MtmAbortPreparedTransaction(&dtmTx);
502+
break;
499503
caseXACT_EVENT_COMMIT:
500504
MtmEndTransaction(&dtmTx, true);
501505
break;
@@ -522,6 +526,7 @@ MtmResetTransaction(MtmCurrentTrans* x)
522526
x->snapshot=INVALID_CSN;
523527
x->xid=InvalidTransactionId;
524528
x->gtid.xid=InvalidTransactionId;
529+
x->isDistributed= false;
525530
}
526531

527532
staticvoid
@@ -620,14 +625,16 @@ static void
620625
MtmPostPrepareTransaction(MtmCurrentTrans*x)
621626
{
622627
MtmTransState*ts;
628+
MtmTransMap*tm;
623629

624630
MtmLock(LW_EXCLUSIVE);
625631
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
626632
Assert(ts!=NULL);
633+
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
634+
Assert(x->gid[0]);
635+
tm->state=ts;
636+
627637
if (!MtmIsCoordinator(ts)) {
628-
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
629-
Assert(x->gid[0]);
630-
tm->state=ts;
631638
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
632639
MtmUnlock();
633640
MtmResetTransaction(x);
@@ -646,6 +653,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
646653
}
647654

648655

656+
staticvoid
657+
MtmAbortPreparedTransaction(MtmCurrentTrans*x)
658+
{
659+
MtmTransMap*tm;
660+
661+
MtmLock(LW_EXCLUSIVE);
662+
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_REMOVE,NULL);
663+
Assert(tm!=NULL);
664+
tm->state->status=TRANSACTION_STATUS_ABORTED;
665+
MtmAdjustSubtransactions(tm->state);
666+
MtmUnlock();
667+
MtmResetTransaction(x);
668+
}
669+
649670
staticvoid
650671
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
651672
{
@@ -695,9 +716,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
695716
}
696717
MtmUnlock();
697718
}
698-
x->snapshot=INVALID_CSN;
699-
x->xid=InvalidTransactionId;
700-
x->gtid.xid=InvalidTransactionId;
719+
MtmResetTransaction(x);
701720
MtmCheckSlots();
702721
}
703722

@@ -1735,17 +1754,16 @@ MtmGenerateGid(char* gid)
17351754

17361755
staticvoidMtmTwoPhaseCommit(char*completionTag)
17371756
{
1738-
chargid[MULTIMASTER_MAX_GID_SIZE];
1739-
MtmGenerateGid(gid);
1757+
MtmGenerateGid(dtmTx.gid);
17401758
if (!IsTransactionBlock()) {
17411759
elog(WARNING,"Start transaction block for %d",dtmTx.xid);
17421760
BeginTransactionBlock();
17431761
CommitTransactionCommand();
17441762
StartTransactionCommand();
17451763
}
1746-
if (!PrepareTransactionBlock(gid))
1764+
if (!PrepareTransactionBlock(dtmTx.gid))
17471765
{
1748-
elog(WARNING,"Failed to prepare transaction %s",gid);
1766+
elog(WARNING,"Failed to prepare transaction %s",dtmTx.gid);
17491767
/* report unsuccessful commit in completionTag */
17501768
if (completionTag) {
17511769
strcpy(completionTag,"ROLLBACK");
@@ -1755,10 +1773,10 @@ static void MtmTwoPhaseCommit(char *completionTag)
17551773
CommitTransactionCommand();
17561774
StartTransactionCommand();
17571775
if (MtmGetCurrentTransactionStatus()==TRANSACTION_STATUS_ABORTED) {
1758-
FinishPreparedTransaction(gid, false);
1759-
elog(ERROR,"Transaction %s is aborted by DTM",gid);
1776+
FinishPreparedTransaction(dtmTx.gid, false);
1777+
elog(ERROR,"Transaction %s is aborted by DTM",dtmTx.gid);
17601778
}else {
1761-
FinishPreparedTransaction(gid, true);
1779+
FinishPreparedTransaction(dtmTx.gid, true);
17621780
}
17631781
}
17641782
}

‎pglogical_apply.c

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -464,13 +464,21 @@ read_rel(StringInfo s, LOCKMODE mode)
464464
returnheap_open(relid,NoLock);
465465
}
466466

467+
staticvoid
468+
MtmSetCurrentSession(intnodeId)
469+
{
470+
charslot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
471+
sprintf(slot_name,MULTIMASTER_SLOT_PATTERN,nodeId);
472+
replorigin_session_origin=replorigin_by_name(slot_name, false);
473+
replorigin_session_setup(replorigin_session_origin);
474+
}
475+
467476
staticvoid
468477
process_remote_commit(StringInfoin)
469478
{
470479
uint8flags;
471480
uint8nodeId;
472481
constchar*gid=NULL;
473-
charslot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
474482

475483
/* read flags */
476484
flags=pq_getmsgbyte(in);
@@ -481,17 +489,14 @@ process_remote_commit(StringInfo in)
481489
pq_getmsgint64(in);/* end_lsn */
482490
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
483491

484-
sprintf(slot_name,MULTIMASTER_SLOT_PATTERN,nodeId);
485-
replorigin_session_origin=replorigin_by_name(slot_name, false);
486-
replorigin_session_setup(replorigin_session_origin);
487-
488492
switch(PGLOGICAL_XACT_EVENT(flags))
489493
{
490494
casePGLOGICAL_COMMIT:
491495
{
492496
MTM_TRACE("%d: PGLOGICAL_COMMIT commit\n",MyProcPid);
493497
if (IsTransactionState()) {
494498
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
499+
MtmSetCurrentSession(nodeId);
495500
CommitTransactionCommand();
496501
}
497502
break;
@@ -505,6 +510,7 @@ process_remote_commit(StringInfo in)
505510
BeginTransactionBlock();
506511
CommitTransactionCommand();
507512
StartTransactionCommand();
513+
MtmSetCurrentSession(nodeId);
508514
/* PREPARE itself */
509515
MtmSetCurrentTransactionGID(gid);
510516
PrepareTransactionBlock(gid);
@@ -517,6 +523,7 @@ process_remote_commit(StringInfo in)
517523
gid=pq_getmsgstring(in);
518524
MTM_TRACE("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n",MyProcPid,csn,gid);
519525
StartTransactionCommand();
526+
MtmSetCurrentSession(nodeId);
520527
MtmSetCurrentTransactionGID(gid);
521528
FinishPreparedTransaction(gid, true);
522529
CommitTransactionCommand();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp