Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8e072df

Browse files
committed
Fix prepare/abort_prepared race condition
1 parentac0afb1 commit8e072df

File tree

7 files changed

+63
-43
lines changed

7 files changed

+63
-43
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -915,17 +915,15 @@ static void MtmTransReceiver(Datum arg)
915915
}else {
916916
switch (msg->code) {
917917
caseMSG_PREPARE:
918-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
919-
ts->status=TRANSACTION_STATUS_UNKNOWN;
920-
ts->csn=MtmAssignCSN();
921-
MtmAdjustSubtransactions(ts);
922-
MtmSendNotificationMessage(ts,MSG_PREPARED);
923-
#if0
918+
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
919+
ts->status=TRANSACTION_STATUS_UNKNOWN;
920+
ts->csn=MtmAssignCSN();
921+
MtmAdjustSubtransactions(ts);
922+
MtmSendNotificationMessage(ts,MSG_PREPARED);
924923
}else {
925924
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
926925
MtmSendNotificationMessage(ts,MSG_ABORTED);
927926
}
928-
#endif
929927
break;
930928
default:
931929
Assert(false);

‎contrib/mmts/multimaster.c‎

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ typedef struct {
7878

7979
typedefstruct {
8080
chargid[MULTIMASTER_MAX_GID_SIZE];
81+
boolabort;
82+
XidStatusstatus;
8183
MtmTransState*state;
8284
}MtmTransMap;
8385

@@ -275,7 +277,7 @@ timestamp_t MtmGetSystemTime(void)
275277
{
276278
structtimevaltv;
277279
gettimeofday(&tv,NULL);
278-
return (timestamp_t)tv.tv_sec*USECS_PER_SEC+tv.tv_usec+Mtm->timeShift;
280+
return (timestamp_t)tv.tv_sec*USECS_PER_SEC+tv.tv_usec;
279281
}
280282

281283
timestamp_tMtmGetCurrentTime(void)
@@ -722,11 +724,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
722724
{
723725
MtmTransState*ts;
724726
TransactionId*subxids;
725-
727+
726728
if (!x->isDistributed) {
727729
return;
728730
}
729731

732+
730733
if (Mtm->inject2PCError==1) {
731734
Mtm->inject2PCError=0;
732735
elog(ERROR,"ERROR INJECTION for transaction %d (%s)",x->xid,x->gid);
@@ -755,7 +758,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
755758
if (!x->isReplicated) {
756759
MtmCheckClusterLock();
757760
}
758-
759761
ts=MtmCreateTransState(x);
760762
/*
761763
* Invalid CSN prevent replication of transaction by logical replication
@@ -780,8 +782,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
780782
MtmAddSubtransactions(ts,subxids,ts->nSubxids);
781783
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
782784
MyProcPid,x->xid,ts->gtid.xid,ts->gtid.node,ts->csn);
783-
MtmUnlock();
784-
785+
MtmUnlock();
785786
}
786787

787788
/*
@@ -819,7 +820,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
819820
Assert(ts!=NULL);
820821

821822
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
822-
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
823+
boolfound;
824+
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,&found);
823825
Assert(x->gid[0]);
824826
tm->state=ts;
825827
ts->votingCompleted= true;
@@ -877,8 +879,8 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
877879
if (x->status!=TRANSACTION_STATUS_ABORTED) {
878880
MtmLock(LW_EXCLUSIVE);
879881
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_REMOVE,NULL);
880-
Assert(tm!=NULL);
881-
MTM_LOG1("Abort prepared transaction %d with gid='%s' is already aborted",x->xid,x->gid);
882+
Assert(tm!=NULL&&tm->state!=NULL);
883+
MTM_LOG1("%ld:Abort prepared transaction %d with gid='%s'",MtmGetSystemTime(),x->xid,x->gid);
882884
MtmAbortTransaction(tm->state);
883885
MtmUnlock();
884886
x->status=TRANSACTION_STATUS_ABORTED;
@@ -1016,21 +1018,26 @@ XidStatus MtmGetCurrentTransactionStatus(void)
10161018
returnMtmTx.status;
10171019
}
10181020

1019-
XidStatusMtmGetGlobalTransactionStatus(charconst*gid)
1021+
XidStatusMtmExchangeGlobalTransactionStatus(charconst*gid,XidStatusnew_status)
10201022
{
1021-
XidStatusstatus;
10221023
MtmTransMap*tm;
1024+
boolfound;
1025+
XidStatusold_status=TRANSACTION_STATUS_IN_PROGRESS;
10231026

10241027
Assert(gid[0]);
1025-
MtmLock(LW_SHARED);
1026-
tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_FIND,NULL);
1027-
if (tm!=NULL) {
1028-
status=tm->state->status;
1028+
MtmLock(LW_EXCLUSIVE);
1029+
tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_ENTER,&found);
1030+
if (found) {
1031+
old_status=tm->status;
1032+
if (old_status!=TRANSACTION_STATUS_ABORTED) {
1033+
tm->status=new_status;
1034+
}
10291035
}else {
1030-
status=TRANSACTION_STATUS_ABORTED;
1036+
tm->state=NULL;
1037+
tm->status=new_status;
10311038
}
10321039
MtmUnlock();
1033-
returnstatus;
1040+
returnold_status;
10341041
}
10351042

10361043
voidMtmSetCurrentTransactionCSN(csn_tcsn)

‎contrib/mmts/multimaster.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ extern csn_t MtmGetTransactionCSN(TransactionId xid);
256256
externvoidMtmSetCurrentTransactionCSN(csn_tcsn);
257257
externTransactionIdMtmGetCurrentTransactionId(void);
258258
externXidStatusMtmGetCurrentTransactionStatus(void);
259-
externXidStatusMtmGetGlobalTransactionStatus(charconst*gid);
259+
externXidStatusMtmExchangeGlobalTransactionStatus(charconst*gid,XidStatusstatus);
260260
externboolMtmIsRecoveredNode(intnodeId);
261261
externboolMtmRefreshClusterStatus(boolnowait);
262262
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,8 @@ process_remote_commit(StringInfo in)
508508
uint8flags;
509509
csn_tcsn;
510510
constchar*gid=NULL;
511-
XLogRecPtrend_lsn;
511+
XLogRecPtrend_lsn;
512+
512513
/* read flags */
513514
flags=pq_getmsgbyte(in);
514515
MtmReplicationNodeId=pq_getmsgbyte(in);
@@ -536,25 +537,38 @@ process_remote_commit(StringInfo in)
536537
{
537538
Assert(IsTransactionState()&&TransactionIdIsValid(MtmGetCurrentTransactionId()));
538539
gid=pq_getmsgstring(in);
539-
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
540-
MTM_LOG1("%d: PGLOGICAL_PREPARE commit: gid=%s",MyProcPid,gid);
541-
BeginTransactionBlock();
542-
CommitTransactionCommand();
543-
StartTransactionCommand();
544-
545-
MtmBeginSession();
546-
/* PREPARE itself */
547-
MtmSetCurrentTransactionGID(gid);
548-
PrepareTransactionBlock(gid);
549-
CommitTransactionCommand();
540+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
541+
MTM_LOG1("%ld: avoid prepare of previously aborted global transaction %s",MtmGetSystemTime(),gid);
542+
AbortCurrentTransaction();
543+
}else {
544+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
545+
MTM_LOG1("%ld: PGLOGICAL_PREPARE commit: gid=%s",MtmGetSystemTime(),gid);
546+
BeginTransactionBlock();
547+
CommitTransactionCommand();
548+
StartTransactionCommand();
549+
550+
MtmBeginSession();
551+
/* PREPARE itself */
552+
MtmSetCurrentTransactionGID(gid);
553+
PrepareTransactionBlock(gid);
554+
CommitTransactionCommand();
555+
556+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_UNKNOWN)==TRANSACTION_STATUS_ABORTED) {
557+
MTM_LOG1("%ld: perform delayed rollback of prepared global transaction %s",MtmGetSystemTime(),gid);
558+
StartTransactionCommand();
559+
MtmSetCurrentTransactionGID(gid);
560+
FinishPreparedTransaction(gid, false);
561+
CommitTransactionCommand();
562+
}
563+
}
550564
break;
551565
}
552566
casePGLOGICAL_COMMIT_PREPARED:
553567
{
554568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
555569
csn=pq_getmsgint64(in);
556570
gid=pq_getmsgstring(in);
557-
MTM_LOG1("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s",MyProcPid,csn,gid);
571+
MTM_LOG1("%ld: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s",MtmGetSystemTime(),csn,gid);
558572
StartTransactionCommand();
559573
MtmBeginSession();
560574
MtmSetCurrentTransactionCSN(csn);
@@ -567,9 +581,9 @@ process_remote_commit(StringInfo in)
567581
{
568582
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569583
gid=pq_getmsgstring(in);
570-
MTM_LOG1("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s",MyProcPid,gid);
571-
if (MtmGetGlobalTransactionStatus(gid)!=TRANSACTION_STATUS_ABORTED) {
572-
MTM_LOG2("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",MyProcPid,gid);
584+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s",MtmGetSystemTime(),gid);
585+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
586+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",MtmGetSystemTime(),gid);
573587
StartTransactionCommand();
574588
MtmSetCurrentTransactionGID(gid);
575589
FinishPreparedTransaction(gid, false);

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150150
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151151
* We do not need to send such transactions unless we perform recovery
152152
*/
153-
if (csn==INVALID_CSN&& !isRecovery) {
153+
if (csn==INVALID_CSN&& !isRecovery)
154+
{
154155
return;
155156
}
156157
if (MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn)) {

‎contrib/raftable/raft/src/raft.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,7 @@ raft_msg_t raft_recv_message(raft_t r) {
11161116
(structsockaddr*)&addr,&addrlen
11171117
);
11181118

1119-
if (recved==-1) {
1119+
if (recved<=0) {
11201120
if (
11211121
(errno==EAGAIN)||
11221122
(errno==EWOULDBLOCK)||

‎contrib/raftable/raftable.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ static bool timed_read(int sock, void *data, size_t len, timeout_t *timeout)
144144
}
145145

146146
newbytes=read(sock, (char*)data+recved,len-recved);
147-
if (newbytes==-1)
147+
if (newbytes<=0)
148148
{
149149
if (errno==EAGAIN) {
150150
if (poll_until_readable(sock,timeout)) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp