@@ -295,6 +295,29 @@ DtmAdjustSubtransactions(DtmTransStatus *ts)
295
295
}
296
296
}
297
297
298
+ /*
299
+ * Add subtransactions to finished transactions list.
300
+ * Copy CSN and status of parent transaction.
301
+ */
302
+ static void
303
+ DtmAddSubtransactions (DtmTransStatus * ts ,TransactionId * subxids ,int nSubxids )
304
+ {
305
+ int i ;
306
+
307
+ for (i = 0 ;i < nSubxids ;i ++ )
308
+ {
309
+ bool found ;
310
+ DtmTransStatus * sts ;
311
+
312
+ Assert (TransactionIdIsValid (subxids [i ]));
313
+ sts = (DtmTransStatus * )hash_search (xid2status ,& subxids [i ],HASH_ENTER ,& found );
314
+ Assert (!found );
315
+ sts -> cid = ts -> cid ;
316
+ sts -> nSubxids = 0 ;
317
+ DtmTransactionListInsertAfter (ts ,sts );
318
+ }
319
+ }
320
+
298
321
/*
299
322
* There can be different oldest XIDs at different cluster node.
300
323
* Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
@@ -465,27 +488,19 @@ DtmLocalBegin(DtmCurrentTrans * x)
465
488
* Returns snapshot of current transaction.
466
489
*/
467
490
cid_t
468
- DtmLocalExtend (GlobalTransactionId gtid )
491
+ DtmLocalExtend ()
469
492
{
470
- DtmCurrentTrans * x = & dtm_tx ;
471
-
472
- if (gtid != NULL )
473
- {
474
- strncpy (x -> gtid ,gtid ,MAX_GTID_SIZE );
475
- }
476
493
DtmInitGlobalXmin (TransactionXmin );
477
-
478
494
dtm_tx .is_global = true;
479
-
480
- return x -> snapshot ;
495
+ return dtm_tx .snapshot ;
481
496
}
482
497
483
498
/*
484
499
* This function is executed on all nodes joining distributed transaction.
485
500
* global_cid is snapshot taken from node initiated this transaction
486
501
*/
487
502
cid_t
488
- DtmLocalAccess (DtmCurrentTrans * x , GlobalTransactionId gtid , cid_t global_cid )
503
+ DtmLocalAccess (cid_t global_cid )
489
504
{
490
505
cid_t local_cid ;
491
506
@@ -494,9 +509,8 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
494
509
SpinLockAcquire (& local -> lock );
495
510
{
496
511
local_cid = dtm_sync (global_cid );
497
- x -> snapshot = global_cid ;
512
+ dtm_tx . snapshot = global_cid ;
498
513
}
499
- strncpy (x -> gtid ,gtid ,MAX_GTID_SIZE );
500
514
SpinLockRelease (& local -> lock );
501
515
502
516
dtm_tx .is_global = true;
@@ -510,6 +524,37 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
510
524
return global_cid ;
511
525
}
512
526
527
+
528
+ /*
529
+ * Save state of parepared transaction
530
+ */
531
+ void
532
+ DtmLocalSavePreparedState (DtmCurrentTrans * x )
533
+ {
534
+
535
+ if (dtm_tx .is_global )
536
+ {
537
+ TransactionId * subxids ;
538
+ TransactionId xid = GetCurrentTransactionId ();
539
+ int nSubxids = xactGetCommittedChildren (& subxids );
540
+
541
+ SpinLockAcquire (& local -> lock );
542
+ {
543
+ DtmTransStatus * ts ;
544
+ bool found ;
545
+
546
+ ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
547
+ Assert (!found );
548
+ ts -> cid = InDoubtGlobalCSN ;
549
+ ts -> nSubxids = nSubxids ;
550
+ DtmTransactionListAppend (ts );
551
+ DtmAddSubtransactions (ts ,subxids ,nSubxids );
552
+ }
553
+ SpinLockRelease (& local -> lock );
554
+ }
555
+ }
556
+
557
+
513
558
/*
514
559
* Set transaction status to in-doubt. Now all transactions accessing tuples updated by this transaction have to
515
560
* wait until it is either committed either aborted
@@ -587,10 +632,8 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
587
632
return ;// global ro tx
588
633
}
589
634
590
-
591
635
dtm_tx .xid = xid ;
592
636
dtm_tx .csn = cid ;
593
-
594
637
}
595
638
596
639
/*
@@ -599,43 +642,57 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
599
642
void
600
643
DtmLocalFinish (bool is_commit )
601
644
{
602
- DtmCurrentTrans * x = & dtm_tx ;
603
645
TransactionId xid = GetCurrentTransactionIdIfAny ();
646
+ bool found ;
647
+ DtmTransStatus * ts ;
604
648
605
- if (x -> gtid [0 ]&& finishing_prepared )
649
+ // We can't check just TransactionIdIsValid(dtm_tx.xid) because
650
+ // then we catch commit of `select pg_global_snaphot_end_prepare(...)`
651
+ if (TransactionIdIsValid (dtm_tx .xid )&&
652
+ (finishing_prepared || // commit prepared of global
653
+ TransactionIdIsValid (xid )))// ordinary commit of global
606
654
{
655
+ // Commit of global prepared tx
656
+
607
657
xid = dtm_tx .xid ;
658
+ Assert (GlobalCSNIsNormal (dtm_tx .csn ));
659
+
660
+ SpinLockAcquire (& local -> lock );
661
+ ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_FIND ,& found );
662
+ Assert (found );
663
+ ts -> cid = is_commit ?dtm_tx .csn :AbortedGlobalCSN ;
664
+ DtmAdjustSubtransactions (ts );// !
665
+ SpinLockRelease (& local -> lock );
666
+
667
+ dtm_tx .xid = InvalidTransactionId ;
668
+ dtm_tx .csn = InvalidGlobalCSN ;
669
+ dtm_tx .is_global = false;
608
670
}
609
- else if (! TransactionIdIsValid (xid ))
671
+ else if (TransactionIdIsValid (xid ))
610
672
{
611
- return ;
612
- }
673
+ // Commit of local tx
613
674
614
- SpinLockAcquire (& local -> lock );
615
- {
616
- bool found ;
617
- DtmTransStatus * ts ;
675
+ TransactionId * subxids ;
676
+ int nSubxids = xactGetCommittedChildren (& subxids );
618
677
619
- ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
678
+ Assert (!GlobalCSNIsNormal (dtm_tx .csn ));
679
+ Assert (!TransactionIdIsValid (dtm_tx .xid ));
620
680
621
- if (found )
681
+ if (dtm_tx . is_global )
622
682
{
623
- Assert (GlobalCSNIsNormal (dtm_tx .csn ));
624
- ts -> cid = is_commit ?dtm_tx .csn :AbortedGlobalCSN ;
625
-
626
- dtm_tx .xid = InvalidTransactionId ;
627
- dtm_tx .csn = InvalidGlobalCSN ;
628
- }
629
- else
630
- {
631
- Assert (!GlobalCSNIsNormal (dtm_tx .csn ));
632
- ts -> cid = is_commit ?dtm_get_cid () :AbortedGlobalCSN ;
633
- DtmTransactionListAppend (ts );
683
+ Assert (!is_commit );
684
+ dtm_tx .is_global = false;
634
685
}
635
- DtmAdjustSubtransactions (ts );
636
- }
637
- SpinLockRelease (& local -> lock );
638
686
687
+ SpinLockAcquire (& local -> lock );
688
+ ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
689
+ Assert (!found );
690
+ ts -> cid = is_commit ?dtm_get_cid () :AbortedGlobalCSN ;
691
+ ts -> nSubxids = nSubxids ;
692
+ DtmTransactionListAppend (ts );
693
+ DtmAddSubtransactions (ts ,subxids ,nSubxids );
694
+ SpinLockRelease (& local -> lock );
695
+ }
639
696
}
640
697
641
698
/*
@@ -669,59 +726,6 @@ DtmDeserializeTransactionState(void* ctx)
669
726
}
670
727
671
728
672
- /*
673
- * Save state of parepared transaction
674
- */
675
- void
676
- DtmLocalSavePreparedState (DtmCurrentTrans * x )
677
- {
678
-
679
- if (dtm_tx .is_global )
680
- {
681
- TransactionId * subxids ;
682
- TransactionId xid = GetCurrentTransactionId ();
683
- int nSubxids = xactGetCommittedChildren (& subxids );
684
-
685
- SpinLockAcquire (& local -> lock );
686
- {
687
- DtmTransStatus * ts ;
688
- bool found ;
689
-
690
- ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
691
- Assert (!found );
692
- ts -> cid = InDoubtGlobalCSN ;
693
- ts -> nSubxids = nSubxids ;
694
- DtmTransactionListAppend (ts );
695
- DtmAddSubtransactions (ts ,subxids ,nSubxids );
696
- }
697
- SpinLockRelease (& local -> lock );
698
- }
699
- }
700
-
701
- /*
702
- * Add subtransactions to finished transactions list.
703
- * Copy CSN and status of parent transaction.
704
- */
705
- static void
706
- DtmAddSubtransactions (DtmTransStatus * ts ,TransactionId * subxids ,int nSubxids )
707
- {
708
- int i ;
709
-
710
- for (i = 0 ;i < nSubxids ;i ++ )
711
- {
712
- bool found ;
713
- DtmTransStatus * sts ;
714
-
715
- Assert (TransactionIdIsValid (subxids [i ]));
716
- sts = (DtmTransStatus * )hash_search (xid2status ,& subxids [i ],HASH_ENTER ,& found );
717
- Assert (!found );
718
- sts -> cid = ts -> cid ;
719
- sts -> nSubxids = 0 ;
720
- DtmTransactionListInsertAfter (ts ,sts );
721
- }
722
- }
723
-
724
-
725
729
/*
726
730
*
727
731
* SQL functions for global snapshot mamagement.
@@ -731,8 +735,7 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
731
735
Datum
732
736
pg_global_snaphot_create (PG_FUNCTION_ARGS )
733
737
{
734
- GlobalTransactionId gtid = text_to_cstring (PG_GETARG_TEXT_PP (0 ));
735
- cid_t cid = DtmLocalExtend (gtid );
738
+ cid_t cid = DtmLocalExtend ();
736
739
737
740
DTM_TRACE ((stderr ,"Backend %d extends transaction %u(%s) to global with cid=%lu\n" ,getpid (),dtm_tx .xid ,gtid ,cid ));
738
741
PG_RETURN_INT64 (cid );
@@ -742,10 +745,9 @@ Datum
742
745
pg_global_snaphot_join (PG_FUNCTION_ARGS )
743
746
{
744
747
cid_t cid = PG_GETARG_INT64 (0 );
745
- GlobalTransactionId gtid = text_to_cstring (PG_GETARG_TEXT_PP (1 ));
746
748
747
749
DTM_TRACE ((stderr ,"Backend %d joins transaction %u(%s) with cid=%lu\n" ,getpid (),dtm_tx .xid ,gtid ,cid ));
748
- cid = DtmLocalAccess (& dtm_tx , gtid , cid );
750
+ cid = DtmLocalAccess (cid );
749
751
PG_RETURN_INT64 (cid );
750
752
}
751
753