@@ -643,33 +643,36 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
643643{
644644MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" ,MyProcPid ,x -> xid ,x -> isPrepared ,x -> isDistributed ,commit ?"commit" :"abort" );
645645if (x -> isDistributed && (x -> isPrepared || x -> isReplicated )) {
646- MtmTransState * ts ;
646+ MtmTransState * ts = NULL ;
647647MtmLock (LW_EXCLUSIVE );
648648if (x -> isPrepared ) {
649649ts = hash_search (xid2state ,& x -> xid ,HASH_FIND ,NULL );
650650Assert (ts != NULL );
651651}else {
652652MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid ,x -> gid ,HASH_REMOVE ,NULL );
653- Assert (hm != NULL );
654- ts = hm -> state ;
653+ if (hm != NULL ) {
654+ ts = hm -> state ;
655+ }
655656}
656- if (commit ) {
657- ts -> status = TRANSACTION_STATUS_COMMITTED ;
658- if (x -> csn > ts -> csn ) {
659- ts -> csn = x -> csn ;
660- MtmSyncClock (ts -> csn );
657+ if (ts != NULL ) {
658+ if (commit ) {
659+ ts -> status = TRANSACTION_STATUS_COMMITTED ;
660+ if (x -> csn > ts -> csn ) {
661+ ts -> csn = x -> csn ;
662+ MtmSyncClock (ts -> csn );
663+ }
664+ }else {
665+ ts -> status = TRANSACTION_STATUS_ABORTED ;
666+ if (x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
667+ /*
668+ * Send notification only of ABORT happens during transaction processing at replicas,
669+ * do not send notification if ABORT is receiver from master
670+ */
671+ MtmSendNotificationMessage (ts );/* send notification to coordinator */
672+ }
661673}
662- }else {
663- ts -> status = TRANSACTION_STATUS_ABORTED ;
664- if (x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
665- /*
666- * Send notification only of ABORT happens during transaction processing at replicas,
667- * do not send notification if ABORT is receiver from master
668- */
669- MtmSendNotificationMessage (ts );/* send notification to coordinator */
670- }
674+ MtmAdjustSubtransactions (ts );
671675}
672- MtmAdjustSubtransactions (ts );
673676MtmUnlock ();
674677}
675678x -> snapshot = INVALID_CSN ;
@@ -1691,6 +1694,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
16911694DestReceiver * dest ,char * completionTag )
16921695{
16931696bool skipCommand ;
1697+ MTM_TRACE ("%d: Process utility statement %s\n" ,MyProcPid ,queryString );
16941698switch (nodeTag (parsetree ))
16951699{
16961700case T_TransactionStmt :
@@ -1702,8 +1706,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
17021706if (dtmTx .isDistributed && dtmTx .containsDML ) {
17031707char gid [MULTIMASTER_MAX_GID_SIZE ];
17041708MtmGenerateGid (gid );
1709+ MTM_TRACE ("%d: Start 2PC with GID=%s for %s\n" ,MyProcPid ,gid ,queryString );
17051710if (!IsTransactionBlock ()) {
17061711elog (WARNING ,"Start transaction block for %d" ,dtmTx .xid );
1712+ BeginTransactionBlock ();
17071713CommitTransactionCommand ();
17081714StartTransactionCommand ();
17091715}