@@ -88,7 +88,7 @@ static HTAB *xid2status;
88
88
static HTAB * gtid2xid ;
89
89
static DtmNodeState * local ;
90
90
static uint64 totalSleepInterrupts ;
91
- static int DtmVacuumDelay = 10 ;/* sec */
91
+ static int DtmVacuumDelay = 15 ;/* sec */
92
92
static bool finishing_prepared ;
93
93
94
94
@@ -99,6 +99,7 @@ static void DtmAdjustOldestXid(void);
99
99
static void DtmInitGlobalXmin (TransactionId xid );
100
100
static bool DtmDetectGlobalDeadLock (PGPROC * proc );
101
101
static void DtmAddSubtransactions (DtmTransStatus * ts ,TransactionId * subxids ,int nSubxids );
102
+ static void DtmAdjustSubtransactions (DtmTransStatus * ts );
102
103
static char const * DtmGetName (void );
103
104
static size_t DtmGetTransactionStateSize (void );
104
105
static void DtmSerializeTransactionState (void * ctx );
@@ -283,6 +284,20 @@ DtmTransactionListInsertAfter(DtmTransStatus * after, DtmTransStatus * ts)
283
284
}
284
285
}
285
286
287
+ static void
288
+ DtmAdjustSubtransactions (DtmTransStatus * ts )
289
+ {
290
+ int i ;
291
+ int nSubxids = ts -> nSubxids ;
292
+ DtmTransStatus * sts = ts ;
293
+
294
+ for (i = 0 ;i < nSubxids ;i ++ ) {
295
+ sts = sts -> next ;
296
+ sts -> status = ts -> status ;
297
+ Assert (sts -> cid == ts -> cid );
298
+ }
299
+ }
300
+
286
301
/*
287
302
* There can be different oldest XIDs at different cluster node.
288
303
* Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
@@ -521,20 +536,23 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
521
536
void
522
537
DtmLocalBeginPrepare (GlobalTransactionId gtid )
523
538
{
539
+ // TransactionId xid = TwoPhaseGetTransactionId(gtid);
540
+
524
541
SpinLockAcquire (& local -> lock );
525
542
{
526
543
DtmTransStatus * ts ;
527
544
DtmTransId * id ;
545
+ bool found ;
528
546
529
547
id = (DtmTransId * )hash_search (gtid2xid ,gtid ,HASH_FIND ,NULL );
530
548
Assert (id != NULL );
531
549
Assert (TransactionIdIsValid (id -> xid ));
532
- ts = (DtmTransStatus * )hash_search (xid2status ,& id -> xid ,HASH_ENTER ,NULL );
550
+ ts = (DtmTransStatus * )hash_search (xid2status ,& id -> xid ,HASH_ENTER ,& found );
533
551
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
534
552
ts -> cid = dtm_get_cid ();
535
- ts -> nSubxids = id -> nSubxids ;
536
- DtmTransactionListAppend ( ts ) ;
537
- DtmAddSubtransactions (ts , id -> subxids , id -> nSubxids );
553
+ if (! found )
554
+ ts -> nSubxids = 0 ;
555
+ DtmAdjustSubtransactions (ts );
538
556
}
539
557
SpinLockRelease (& local -> lock );
540
558
}
@@ -575,11 +593,7 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
575
593
ts = (DtmTransStatus * )hash_search (xid2status ,& id -> xid ,HASH_FIND ,NULL );
576
594
Assert (ts != NULL );
577
595
ts -> cid = cid ;
578
- for (i = 0 ;i < ts -> nSubxids ;i ++ )
579
- {
580
- ts = ts -> next ;
581
- ts -> cid = cid ;
582
- }
596
+ DtmAdjustSubtransactions (ts );
583
597
dtm_sync (cid );
584
598
585
599
DTM_TRACE ((stderr ,"Prepare transaction %u(%s) with CSN %lu\n" ,id -> xid ,gtid ,cid ));
@@ -625,39 +639,14 @@ DtmLocalFinish(bool is_commit)
625
639
626
640
ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
627
641
ts -> status = is_commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED ;
628
- if (found )
629
- {
630
642
631
- if (is_commit )// XXX: why only for commit?
632
- {
633
- int i ;
634
- DtmTransStatus * sts = ts ;
635
-
636
- for (i = 0 ;i < ts -> nSubxids ;i ++ )
637
- {
638
- sts = sts -> next ;
639
- Assert (sts -> cid == ts -> cid );
640
- sts -> status = TRANSACTION_STATUS_COMMITTED ;
641
- }
642
- }
643
- }
644
- else
643
+ if (!found )
645
644
{
646
- TransactionId * subxids ;
647
-
648
- Assert (!found );
649
645
ts -> cid = dtm_get_cid ();
646
+ ts -> nSubxids = 0 ;
650
647
DtmTransactionListAppend (ts );
651
- if (is_commit )// XXX: why?
652
- {
653
- ts -> nSubxids = xactGetCommittedChildren (& subxids );
654
- DtmAddSubtransactions (ts ,subxids ,ts -> nSubxids );
655
- }
656
- else
657
- {
658
- ts -> nSubxids = 0 ;
659
- }
660
648
}
649
+ DtmAdjustSubtransactions (ts );
661
650
}
662
651
SpinLockRelease (& local -> lock );
663
652
@@ -722,24 +711,35 @@ DtmLocalSavePreparedState(DtmCurrentTrans * x)
722
711
723
712
if (x -> gtid [0 ])
724
713
{
714
+ TransactionId * subxids ;
715
+ TransactionId xid = GetCurrentTransactionId ();
716
+ int nSubxids = xactGetCommittedChildren (& subxids );
717
+
725
718
SpinLockAcquire (& local -> lock );
726
719
{
727
720
DtmTransId * id = (DtmTransId * )hash_search (gtid2xid ,x -> gtid ,HASH_FIND ,NULL );
728
721
729
722
if (id != NULL )
730
723
{
731
- TransactionId * subxids ;
732
- int nSubxids = xactGetCommittedChildren (& subxids );
733
-
734
724
id -> xid = GetCurrentTransactionId ();
735
- if (nSubxids != 0 )
736
- {
737
- id -> subxids = (TransactionId * )malloc (nSubxids * sizeof (TransactionId ));
738
- id -> nSubxids = nSubxids ;
739
- memcpy (id -> subxids ,subxids ,nSubxids * sizeof (TransactionId ));
740
- }
725
+
741
726
}
742
727
}
728
+ // SpinLockRelease(&local->lock);
729
+
730
+
731
+
732
+ // SpinLockAcquire(&local->lock);
733
+ {
734
+ DtmTransStatus * ts ;
735
+
736
+ ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,NULL );
737
+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
738
+ ts -> cid = dtm_get_cid ();
739
+ ts -> nSubxids = nSubxids ;
740
+ DtmTransactionListAppend (ts );
741
+ DtmAddSubtransactions (ts ,subxids ,nSubxids );
742
+ }
743
743
SpinLockRelease (& local -> lock );
744
744
}
745
745
}