@@ -61,7 +61,8 @@ typedef struct {
6161bool isReplicated ;/* transaction on replica */
6262bool isDistributed ;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6363bool containsDML ;/* transaction contains DML statements */
64- csn_t snapshot ;/* transaction snaphsot */
64+ bool isPrepared ;/* transaction is prepared as part of 2PC */
65+ csn_t snapshot ;/* transaction snaphsot */
6566}MtmCurrentTrans ;
6667
6768/* #define USE_SPINLOCK 1 */
@@ -94,6 +95,8 @@ static void MtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
9495static void MtmInitialize (void );
9596static void MtmXactCallback (XactEvent event ,void * arg );
9697static void MtmBeginTransaction (MtmCurrentTrans * x );
98+ static void MtmPrecommitTransaction (MtmCurrentTrans * x );
99+ static bool MtmCommitTransaction (TransactionId xid ,int nsubxids ,TransactionId * subxids );
97100static void MtmPrepareTransaction (MtmCurrentTrans * x );
98101static void MtmEndTransaction (MtmCurrentTrans * x ,bool commit );
99102static TransactionId MtmGetOldestXmin (Relation rel ,bool ignoreVacuum );
@@ -143,6 +146,7 @@ static int MtmWorkers;
143146static int MtmVacuumDelay ;
144147static int MtmMinRecoveryLag ;
145148static int MtmMaxRecoveryLag ;
149+ static bool MtmUse2PC ;
146150
147151static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
148152static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -467,6 +471,9 @@ MtmXactCallback(XactEvent event, void *arg)
467471MtmBeginTransaction (& dtmTx );
468472break ;
469473case XACT_EVENT_PRE_COMMIT :
474+ MtmPrecommitTransaction (& dtmTx );
475+ break ;
476+ case XACT_EVENT_PREPARE :
470477MtmPrepareTransaction (& dtmTx );
471478break ;
472479case XACT_EVENT_COMMIT :
@@ -498,6 +505,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
498505x -> isReplicated = false;
499506x -> isDistributed = MtmIsUserTransaction ();
500507x -> containsDML = false;
508+ x -> isPrepared = false;
501509x -> snapshot = MtmAssignCSN ();
502510x -> gtid .xid = InvalidTransactionId ;
503511MtmUnlock ();
@@ -561,10 +569,11 @@ MtmCheckClusterLock()
561569}
562570
563571/*
572+ * This functions is called as pre-commit callback.
564573 * We need to pass snapshot to WAL-sender, so create record in transaction status hash table
565574 * before commit
566575 */
567- static void MtmPrepareTransaction (MtmCurrentTrans * x )
576+ static void MtmPrecommitTransaction (MtmCurrentTrans * x )
568577{
569578MtmTransState * ts ;
570579int i ;
@@ -608,6 +617,20 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
608617MTM_TRACE ("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n" ,getpid (),x -> xid ,ts -> csn );
609618}
610619
620+ static void
621+ MtmPrepareTransaction (MtmCurrentTrans * x )
622+ {
623+ TransactionId * subxids ;
624+ int nSubxids ;
625+ MtmPrecommitTransaction (x );
626+ x -> isPrepared = true;
627+ nSubxids = xactGetCommittedChildren (& subxids );
628+ if (!MtmCommitTransaction (x -> xid ,nSubxids ,subxids ))
629+ {
630+ elog (ERROR ,"Commit of transaction %d is rejected by DTM" ,x -> xid );
631+ }
632+ }
633+
611634/**
612635 * Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
613636 * WAL overflow
@@ -755,7 +778,7 @@ static void
755778MtmSetTransactionStatus (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status ,XLogRecPtr lsn )
756779{
757780MTM_TRACE ("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n" ,getpid (),xid ,dtmTx .xid ,status ,dtmTx .isDistributed );
758- if (xid == dtmTx .xid && dtmTx .isDistributed )
781+ if (xid == dtmTx .xid && dtmTx .isDistributed && ! dtmTx . isPrepared )
759782{
760783if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML || dtm -> status == MTM_RECOVERY )
761784{
@@ -812,6 +835,18 @@ _PG_init(void)
812835if (!process_shared_preload_libraries_in_progress )
813836return ;
814837
838+ DefineCustomBoolVariable (
839+ "multimaster.use_2pc" ,
840+ "Use two phase commit" ,
841+ "Replace normal commit with two phase commit" ,
842+ & MtmUse2PC ,
843+ false,
844+ PGC_BACKEND ,
845+ 0 ,
846+ NULL ,
847+ NULL ,
848+ NULL
849+ );
815850DefineCustomIntVariable (
816851"multimaster.min_recovery_lag" ,
817852"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed" ,
@@ -1313,13 +1348,54 @@ static bool MtmProcessDDLCommand(char const* queryString)
13131348return false;
13141349}
13151350
1351+ /*
1352+ * Genenerate global transaction identifier for two-pahse commit.
1353+ * It should be unique for all nodes
1354+ */
1355+ static char *
1356+ MtmGenerateGid ()
1357+ {
1358+ static int localCount ;
1359+ return psprintf ("GID-%d-%d-%d" ,MtmNodeId ,MyProcPid ,++ localCount );
1360+ }
1361+
13161362static void MtmProcessUtility (Node * parsetree ,const char * queryString ,
13171363ProcessUtilityContext context ,ParamListInfo params ,
13181364DestReceiver * dest ,char * completionTag )
13191365{
13201366bool skipCommand ;
13211367switch (nodeTag (parsetree ))
13221368{
1369+ case T_TransactionStmt :
1370+ {
1371+ TransactionStmt * stmt = (TransactionStmt * )parsetree ;
1372+ switch (stmt -> kind )
1373+ {
1374+ case TRANS_STMT_COMMIT :
1375+ if (MtmUse2PC ) {
1376+ char * gid = MtmGenerateGid ();
1377+ if (!PrepareTransactionBlock (gid ))
1378+ {
1379+ /* report unsuccessful commit in completionTag */
1380+ if (completionTag ) {
1381+ strcpy (completionTag ,"ROLLBACK" );
1382+ }
1383+ /* ??? Should we do explicit rollback */
1384+ }else {
1385+ FinishPreparedTransaction (gid , true);
1386+ }
1387+ return ;
1388+ }
1389+ break ;
1390+ case TRANS_STMT_PREPARE :
1391+ case TRANS_STMT_COMMIT_PREPARED :
1392+ case TRANS_STMT_ROLLBACK_PREPARED :
1393+ elog (ERROR ,"Two phase commit is not supported by multimaster" );
1394+ default :
1395+ break ;
1396+ }
1397+ }
1398+ /* no break */
13231399case T_PlannedStmt :
13241400case T_ClosePortalStmt :
13251401case T_FetchStmt :
@@ -1333,7 +1409,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
13331409case T_LoadStmt :
13341410case T_VariableSetStmt :
13351411case T_VariableShowStmt :
1336- case T_TransactionStmt :
13371412skipCommand = true;
13381413break ;
13391414default :