Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7b2a465

Browse files
knizhnikkelvich
authored andcommitted
Fix prepare/abort_prepared race condition
1 parent811e5ce commit7b2a465

File tree

5 files changed

+61
-41
lines changed

5 files changed

+61
-41
lines changed

‎arbiter.c

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -915,17 +915,15 @@ static void MtmTransReceiver(Datum arg)
915915
}else {
916916
switch (msg->code) {
917917
caseMSG_PREPARE:
918-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
919-
ts->status=TRANSACTION_STATUS_UNKNOWN;
920-
ts->csn=MtmAssignCSN();
921-
MtmAdjustSubtransactions(ts);
922-
MtmSendNotificationMessage(ts,MSG_PREPARED);
923-
#if0
918+
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
919+
ts->status=TRANSACTION_STATUS_UNKNOWN;
920+
ts->csn=MtmAssignCSN();
921+
MtmAdjustSubtransactions(ts);
922+
MtmSendNotificationMessage(ts,MSG_PREPARED);
924923
}else {
925924
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
926925
MtmSendNotificationMessage(ts,MSG_ABORTED);
927926
}
928-
#endif
929927
break;
930928
default:
931929
Assert(false);

‎multimaster.c

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ typedef struct {
7777

7878
typedefstruct {
7979
chargid[MULTIMASTER_MAX_GID_SIZE];
80+
boolabort;
81+
XidStatusstatus;
8082
MtmTransState*state;
8183
}MtmTransMap;
8284

@@ -274,7 +276,7 @@ timestamp_t MtmGetSystemTime(void)
274276
{
275277
structtimevaltv;
276278
gettimeofday(&tv,NULL);
277-
return (timestamp_t)tv.tv_sec*USECS_PER_SEC+tv.tv_usec+Mtm->timeShift;
279+
return (timestamp_t)tv.tv_sec*USECS_PER_SEC+tv.tv_usec;
278280
}
279281

280282
timestamp_tMtmGetCurrentTime(void)
@@ -721,11 +723,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
721723
{
722724
MtmTransState*ts;
723725
TransactionId*subxids;
724-
726+
725727
if (!x->isDistributed) {
726728
return;
727729
}
728730

731+
729732
if (Mtm->inject2PCError==1) {
730733
Mtm->inject2PCError=0;
731734
elog(ERROR,"ERROR INJECTION for transaction %d (%s)",x->xid,x->gid);
@@ -754,7 +757,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
754757
if (!x->isReplicated) {
755758
MtmCheckClusterLock();
756759
}
757-
758760
ts=MtmCreateTransState(x);
759761
/*
760762
* Invalid CSN prevent replication of transaction by logical replication
@@ -779,8 +781,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
779781
MtmAddSubtransactions(ts,subxids,ts->nSubxids);
780782
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
781783
MyProcPid,x->xid,ts->gtid.xid,ts->gtid.node,ts->csn);
782-
MtmUnlock();
783-
784+
MtmUnlock();
784785
}
785786

786787
/*
@@ -818,7 +819,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
818819
Assert(ts!=NULL);
819820

820821
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
821-
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
822+
boolfound;
823+
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,&found);
822824
Assert(x->gid[0]);
823825
tm->state=ts;
824826
ts->votingCompleted= true;
@@ -876,8 +878,8 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
876878
if (x->status!=TRANSACTION_STATUS_ABORTED) {
877879
MtmLock(LW_EXCLUSIVE);
878880
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_REMOVE,NULL);
879-
Assert(tm!=NULL);
880-
MTM_LOG1("Abort prepared transaction %d with gid='%s' is already aborted",x->xid,x->gid);
881+
Assert(tm!=NULL&&tm->state!=NULL);
882+
MTM_LOG1("%ld:Abort prepared transaction %d with gid='%s'",MtmGetSystemTime(),x->xid,x->gid);
881883
MtmAbortTransaction(tm->state);
882884
MtmUnlock();
883885
x->status=TRANSACTION_STATUS_ABORTED;
@@ -1015,21 +1017,26 @@ XidStatus MtmGetCurrentTransactionStatus(void)
10151017
returnMtmTx.status;
10161018
}
10171019

1018-
XidStatusMtmGetGlobalTransactionStatus(charconst*gid)
1020+
XidStatusMtmExchangeGlobalTransactionStatus(charconst*gid,XidStatusnew_status)
10191021
{
1020-
XidStatusstatus;
10211022
MtmTransMap*tm;
1023+
boolfound;
1024+
XidStatusold_status=TRANSACTION_STATUS_IN_PROGRESS;
10221025

10231026
Assert(gid[0]);
1024-
MtmLock(LW_SHARED);
1025-
tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_FIND,NULL);
1026-
if (tm!=NULL) {
1027-
status=tm->state->status;
1027+
MtmLock(LW_EXCLUSIVE);
1028+
tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_ENTER,&found);
1029+
if (found) {
1030+
old_status=tm->status;
1031+
if (old_status!=TRANSACTION_STATUS_ABORTED) {
1032+
tm->status=new_status;
1033+
}
10281034
}else {
1029-
status=TRANSACTION_STATUS_ABORTED;
1035+
tm->state=NULL;
1036+
tm->status=new_status;
10301037
}
10311038
MtmUnlock();
1032-
returnstatus;
1039+
returnold_status;
10331040
}
10341041

10351042
voidMtmSetCurrentTransactionCSN(csn_tcsn)

‎multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ extern csn_t MtmGetTransactionCSN(TransactionId xid);
256256
externvoidMtmSetCurrentTransactionCSN(csn_tcsn);
257257
externTransactionIdMtmGetCurrentTransactionId(void);
258258
externXidStatusMtmGetCurrentTransactionStatus(void);
259-
externXidStatusMtmGetGlobalTransactionStatus(charconst*gid);
259+
externXidStatusMtmExchangeGlobalTransactionStatus(charconst*gid,XidStatusstatus);
260260
externboolMtmIsRecoveredNode(intnodeId);
261261
externboolMtmRefreshClusterStatus(boolnowait);
262262
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);

‎pglogical_apply.c

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,8 @@ process_remote_commit(StringInfo in)
508508
uint8flags;
509509
csn_tcsn;
510510
constchar*gid=NULL;
511-
XLogRecPtrend_lsn;
511+
XLogRecPtrend_lsn;
512+
512513
/* read flags */
513514
flags=pq_getmsgbyte(in);
514515
MtmReplicationNodeId=pq_getmsgbyte(in);
@@ -536,25 +537,38 @@ process_remote_commit(StringInfo in)
536537
{
537538
Assert(IsTransactionState()&&TransactionIdIsValid(MtmGetCurrentTransactionId()));
538539
gid=pq_getmsgstring(in);
539-
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
540-
MTM_LOG1("%d: PGLOGICAL_PREPARE commit: gid=%s",MyProcPid,gid);
541-
BeginTransactionBlock();
542-
CommitTransactionCommand();
543-
StartTransactionCommand();
544-
545-
MtmBeginSession();
546-
/* PREPARE itself */
547-
MtmSetCurrentTransactionGID(gid);
548-
PrepareTransactionBlock(gid);
549-
CommitTransactionCommand();
540+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
541+
MTM_LOG1("%ld: avoid prepare of previously aborted global transaction %s",MtmGetSystemTime(),gid);
542+
AbortCurrentTransaction();
543+
}else {
544+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
545+
MTM_LOG1("%ld: PGLOGICAL_PREPARE commit: gid=%s",MtmGetSystemTime(),gid);
546+
BeginTransactionBlock();
547+
CommitTransactionCommand();
548+
StartTransactionCommand();
549+
550+
MtmBeginSession();
551+
/* PREPARE itself */
552+
MtmSetCurrentTransactionGID(gid);
553+
PrepareTransactionBlock(gid);
554+
CommitTransactionCommand();
555+
556+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_UNKNOWN)==TRANSACTION_STATUS_ABORTED) {
557+
MTM_LOG1("%ld: perform delayed rollback of prepared global transaction %s",MtmGetSystemTime(),gid);
558+
StartTransactionCommand();
559+
MtmSetCurrentTransactionGID(gid);
560+
FinishPreparedTransaction(gid, false);
561+
CommitTransactionCommand();
562+
}
563+
}
550564
break;
551565
}
552566
casePGLOGICAL_COMMIT_PREPARED:
553567
{
554568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
555569
csn=pq_getmsgint64(in);
556570
gid=pq_getmsgstring(in);
557-
MTM_LOG1("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s",MyProcPid,csn,gid);
571+
MTM_LOG1("%ld: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s",MtmGetSystemTime(),csn,gid);
558572
StartTransactionCommand();
559573
MtmBeginSession();
560574
MtmSetCurrentTransactionCSN(csn);
@@ -567,9 +581,9 @@ process_remote_commit(StringInfo in)
567581
{
568582
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569583
gid=pq_getmsgstring(in);
570-
MTM_LOG1("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s",MyProcPid,gid);
571-
if (MtmGetGlobalTransactionStatus(gid)!=TRANSACTION_STATUS_ABORTED) {
572-
MTM_LOG2("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",MyProcPid,gid);
584+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s",MtmGetSystemTime(),gid);
585+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
586+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",MtmGetSystemTime(),gid);
573587
StartTransactionCommand();
574588
MtmSetCurrentTransactionGID(gid);
575589
FinishPreparedTransaction(gid, false);

‎pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150150
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151151
* We do not need to send such transactions unless we perform recovery
152152
*/
153-
if (csn==INVALID_CSN&& !isRecovery) {
153+
if (csn==INVALID_CSN&& !isRecovery)
154+
{
154155
return;
155156
}
156157
if (MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn)) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp