@@ -508,7 +508,8 @@ process_remote_commit(StringInfo in)
508
508
uint8 flags ;
509
509
csn_t csn ;
510
510
const char * gid = NULL ;
511
- XLogRecPtr end_lsn ;
511
+ XLogRecPtr end_lsn ;
512
+
512
513
/* read flags */
513
514
flags = pq_getmsgbyte (in );
514
515
MtmReplicationNodeId = pq_getmsgbyte (in );
@@ -536,25 +537,38 @@ process_remote_commit(StringInfo in)
536
537
{
537
538
Assert (IsTransactionState ()&& TransactionIdIsValid (MtmGetCurrentTransactionId ()));
538
539
gid = pq_getmsgstring (in );
539
- /* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
540
- MTM_LOG1 ("%d: PGLOGICAL_PREPARE commit: gid=%s" ,MyProcPid ,gid );
541
- BeginTransactionBlock ();
542
- CommitTransactionCommand ();
543
- StartTransactionCommand ();
544
-
545
- MtmBeginSession ();
546
- /* PREPARE itself */
547
- MtmSetCurrentTransactionGID (gid );
548
- PrepareTransactionBlock (gid );
549
- CommitTransactionCommand ();
540
+ if (MtmExchangeGlobalTransactionStatus (gid ,TRANSACTION_STATUS_IN_PROGRESS )== TRANSACTION_STATUS_ABORTED ) {
541
+ MTM_LOG1 ("%ld: avoid prepare of previously aborted global transaction %s" ,MtmGetSystemTime (),gid );
542
+ AbortCurrentTransaction ();
543
+ }else {
544
+ /* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
545
+ MTM_LOG1 ("%ld: PGLOGICAL_PREPARE commit: gid=%s" ,MtmGetSystemTime (),gid );
546
+ BeginTransactionBlock ();
547
+ CommitTransactionCommand ();
548
+ StartTransactionCommand ();
549
+
550
+ MtmBeginSession ();
551
+ /* PREPARE itself */
552
+ MtmSetCurrentTransactionGID (gid );
553
+ PrepareTransactionBlock (gid );
554
+ CommitTransactionCommand ();
555
+
556
+ if (MtmExchangeGlobalTransactionStatus (gid ,TRANSACTION_STATUS_UNKNOWN )== TRANSACTION_STATUS_ABORTED ) {
557
+ MTM_LOG1 ("%ld: perform delayed rollback of prepared global transaction %s" ,MtmGetSystemTime (),gid );
558
+ StartTransactionCommand ();
559
+ MtmSetCurrentTransactionGID (gid );
560
+ FinishPreparedTransaction (gid , false);
561
+ CommitTransactionCommand ();
562
+ }
563
+ }
550
564
break ;
551
565
}
552
566
case PGLOGICAL_COMMIT_PREPARED :
553
567
{
554
568
Assert (!TransactionIdIsValid (MtmGetCurrentTransactionId ()));
555
569
csn = pq_getmsgint64 (in );
556
570
gid = pq_getmsgstring (in );
557
- MTM_LOG1 ("%d : PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s" ,MyProcPid ,csn ,gid );
571
+ MTM_LOG1 ("%ld : PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s" ,MtmGetSystemTime () ,csn ,gid );
558
572
StartTransactionCommand ();
559
573
MtmBeginSession ();
560
574
MtmSetCurrentTransactionCSN (csn );
@@ -567,9 +581,9 @@ process_remote_commit(StringInfo in)
567
581
{
568
582
Assert (!TransactionIdIsValid (MtmGetCurrentTransactionId ()));
569
583
gid = pq_getmsgstring (in );
570
- MTM_LOG1 ("%d : PGLOGICAL_ABORT_PREPARED commit: gid=%s" ,MyProcPid ,gid );
571
- if (MtmGetGlobalTransactionStatus (gid ) != TRANSACTION_STATUS_ABORTED ) {
572
- MTM_LOG2 ("%d : PGLOGICAL_ABORT_PREPARED commit: gid=%s #2" ,MyProcPid ,gid );
584
+ MTM_LOG1 ("%ld : PGLOGICAL_ABORT_PREPARED commit: gid=%s" ,MtmGetSystemTime () ,gid );
585
+ if (MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_ABORTED ) == TRANSACTION_STATUS_UNKNOWN ) {
586
+ MTM_LOG1 ("%ld : PGLOGICAL_ABORT_PREPARED commit: gid=%s #2" ,MtmGetSystemTime () ,gid );
573
587
StartTransactionCommand ();
574
588
MtmSetCurrentTransactionGID (gid );
575
589
FinishPreparedTransaction (gid , false);