Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit291226a

Browse files
committed
Merge branch 'commit_hook'
2 parents4054f6b +eb21867 commit291226a

File tree

7 files changed

+51
-39
lines changed

7 files changed

+51
-39
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ typedef struct {
6464
boolisReplicated;/* transaction on replica */
6565
boolisDistributed;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6666
boolisPrepared;/* transaction is perpared at first stage of 2PC */
67+
boolisTransactionBlock;/* is transaction block */
6768
boolcontainsDML;/* transaction contains DML statements */
6869
XidStatusstatus;/* transaction status */
6970
csn_tsnapshot;/* transaction snaphsot */
@@ -111,6 +112,7 @@ static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
111112
staticvoidMtmPostPrepareTransaction(MtmCurrentTrans*x);
112113
staticvoidMtmAbortPreparedTransaction(MtmCurrentTrans*x);
113114
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
115+
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x);
114116
staticTransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum);
115117
staticboolMtmXidInMVCCSnapshot(TransactionIdxid,Snapshotsnapshot);
116118
staticTransactionIdMtmAdjustOldestXid(TransactionIdxid);
@@ -588,6 +590,11 @@ MtmXactCallback(XactEvent event, void *arg)
588590
caseXACT_EVENT_ABORT:
589591
MtmEndTransaction(&MtmTx, false);
590592
break;
593+
caseXACT_EVENT_COMMIT_COMMAND:
594+
if (!MtmTx.isTransactionBlock) {
595+
MtmTwoPhaseCommit(&MtmTx);
596+
}
597+
break;
591598
default:
592599
break;
593600
}
@@ -623,6 +630,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
623630
x->isReplicated= false;
624631
x->isDistributed=MtmIsUserTransaction();
625632
x->isPrepared= false;
633+
x->isTransactionBlock=IsTransactionBlock();
626634
if (x->isDistributed&&Mtm->status!=MTM_ONLINE) {
627635
/* reject all user's transactions at offline cluster */
628636
MtmUnlock();
@@ -1922,33 +1930,34 @@ MtmGenerateGid(char* gid)
19221930
sprintf(gid,"MTM-%d-%d-%d",MtmNodeId,MyProcPid,++localCount);
19231931
}
19241932

1925-
staticvoidMtmTwoPhaseCommit(char*completionTag)
1933+
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x)
19261934
{
1927-
MtmGenerateGid(MtmTx.gid);
1928-
if (!IsTransactionBlock()) {
1929-
elog(WARNING,"Start transaction block for %d",MtmTx.xid);
1930-
BeginTransactionBlock();
1931-
CommitTransactionCommand();
1932-
StartTransactionCommand();
1933-
}
1934-
if (!PrepareTransactionBlock(MtmTx.gid))
1935-
{
1936-
elog(WARNING,"Failed to prepare transaction %s",MtmTx.gid);
1937-
/* report unsuccessful commit in completionTag */
1938-
if (completionTag) {
1939-
strcpy(completionTag,"ROLLBACK");
1935+
if (!x->isReplicated&& (x->isDistributed&&x->containsDML)) {
1936+
MtmGenerateGid(x->gid);
1937+
if (!x->isTransactionBlock) {
1938+
elog(WARNING,"Start transaction block for %s",x->gid);
1939+
BeginTransactionBlock();
1940+
x->isTransactionBlock= true;
1941+
CommitTransactionCommand();
1942+
StartTransactionCommand();
19401943
}
1941-
/* ??? Should we do explicit rollback */
1942-
}else {
1943-
CommitTransactionCommand();
1944-
StartTransactionCommand();
1945-
if (MtmGetCurrentTransactionStatus()==TRANSACTION_STATUS_ABORTED) {
1946-
FinishPreparedTransaction(MtmTx.gid, false);
1947-
elog(ERROR,"Transaction %s is aborted by DTM",MtmTx.gid);
1948-
}else {
1949-
FinishPreparedTransaction(MtmTx.gid, true);
1944+
if (!PrepareTransactionBlock(x->gid))
1945+
{
1946+
elog(WARNING,"Failed to prepare transaction %s",x->gid);
1947+
/* ??? Should we do explicit rollback */
1948+
}else {
1949+
CommitTransactionCommand();
1950+
StartTransactionCommand();
1951+
if (MtmGetCurrentTransactionStatus()==TRANSACTION_STATUS_ABORTED) {
1952+
FinishPreparedTransaction(x->gid, false);
1953+
elog(ERROR,"Transaction %s is aborted by DTM",x->gid);
1954+
}else {
1955+
FinishPreparedTransaction(x->gid, true);
1956+
}
19501957
}
1958+
return true;
19511959
}
1960+
return false;
19521961
}
19531962

19541963
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
@@ -1964,9 +1973,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19641973
TransactionStmt*stmt= (TransactionStmt*)parsetree;
19651974
switch (stmt->kind)
19661975
{
1976+
caseTRANS_STMT_BEGIN:
1977+
MtmTx.isTransactionBlock= true;
1978+
break;
19671979
caseTRANS_STMT_COMMIT:
1968-
if (MtmTx.isDistributed&&MtmTx.containsDML) {
1969-
MtmTwoPhaseCommit(completionTag);
1980+
if (MtmTwoPhaseCommit(&MtmTx)) {
19701981
return;
19711982
}
19721983
break;
@@ -2002,9 +2013,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
20022013
if (MtmProcessDDLCommand(queryString)) {
20032014
return;
20042015
}
2005-
if (MtmTx.isDistributed&&MtmTx.containsDML&& !IsTransactionBlock()) {
2006-
MtmTwoPhaseCommit(completionTag);
2007-
}
20082016
}
20092017
if (PreviousProcessUtilityHook!=NULL)
20102018
{
@@ -2034,9 +2042,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
20342042
}
20352043
}
20362044
}
2037-
if (MtmTx.isDistributed&&MtmTx.containsDML&& !IsTransactionBlock()) {
2038-
MtmTwoPhaseCommit(NULL);
2039-
}
20402045
}
20412046
if (PreviousExecutorFinishHook!=NULL)
20422047
{

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,13 +480,15 @@ MtmBeginSession(void)
480480
}
481481

482482
staticvoid
483-
MtmEndSession(void)
483+
MtmEndSession(boolunlock)
484484
{
485485
if (replorigin_session_origin!=InvalidRepOriginId) {
486486
MTM_TRACE("%d: Begin reset replorigin session: %d\n",MyProcPid,replorigin_session_origin);
487487
replorigin_session_origin=InvalidRepOriginId;
488488
replorigin_session_reset();
489-
MtmUnlockNode(MtmReplicationNode);
489+
if (unlock) {
490+
MtmUnlockNode(MtmReplicationNode);
491+
}
490492
MTM_TRACE("%d: End reset replorigin session: %d\n",MyProcPid,replorigin_session_origin);
491493
}
492494
}
@@ -568,7 +570,7 @@ process_remote_commit(StringInfo in)
568570
default:
569571
Assert(false);
570572
}
571-
MtmEndSession();
573+
MtmEndSession(true);
572574
}
573575

574576
staticvoid
@@ -935,7 +937,7 @@ void MtmExecutor(int id, void* work, size_t size)
935937
EmitErrorReport();
936938
FlushErrorState();
937939
MTM_TRACE("%d: REMOTE begin abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
938-
MtmEndSession();
940+
MtmEndSession(false);
939941
AbortCurrentTransaction();
940942
MTM_TRACE("%d: REMOTE end abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
941943
}

‎contrib/mmts/tests/dtmbench.cpp‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void* reader(void* arg)
129129
result r = txn.exec("select sum(v) from t");
130130
int64_t sum = r[0][0].as(int64_t());
131131
if (sum != prevSum) {
132-
//r = txn.exec("select mtm_get_snapshot()");
132+
r = txn.exec("select mtm_get_snapshot()");
133133
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
134134
prevSum = sum;
135135
}

‎src/backend/access/transam/twophase.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1248,7 +1248,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12481248

12491249
hdr= (TwoPhaseFileHeader*)xlrec;
12501250
bufptr=xlrec+MAXALIGN(sizeof(TwoPhaseFileHeader));
1251-
1251+
12521252
strncpy(parsed->twophase_gid,bufptr,hdr->gidlen);
12531253
bufptr+=MAXALIGN(hdr->gidlen);
12541254

‎src/backend/access/transam/xact.c‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2728,6 +2728,8 @@ CommitTransactionCommand(void)
27282728
{
27292729
TransactionStates=CurrentTransactionState;
27302730

2731+
CallXactCallbacks(XACT_EVENT_COMMIT_COMMAND);
2732+
27312733
switch (s->blockState)
27322734
{
27332735
/*

‎src/backend/utils/cache/inval.c‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,8 +603,10 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
603603
elseif (msg->rm.dbId==MyDatabaseId)
604604
InvalidateCatalogSnapshot();
605605
}
606-
else
606+
else {
607+
*(int*)0=0;
607608
elog(FATAL,"unrecognized SI message ID: %d",msg->id);
609+
}
608610
}
609611

610612
/*

‎src/include/access/xact.h‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ typedef enum
9292
XACT_EVENT_PRE_PREPARE,
9393
XACT_EVENT_POST_PREPARE,
9494
XACT_EVENT_COMMIT_PREPARED,
95-
XACT_EVENT_ABORT_PREPARED
95+
XACT_EVENT_ABORT_PREPARED,
96+
XACT_EVENT_COMMIT_COMMAND
9697
}XactEvent;
9798

9899
typedefvoid (*XactCallback) (XactEventevent,void*arg);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp