@@ -49,7 +49,6 @@ typedef uint64 timestamp_t;
49
49
typedef struct DtmTransStatus
50
50
{
51
51
TransactionId xid ;
52
- XidStatus status ;
53
52
int nSubxids ;
54
53
cid_t cid ;/* CSN */
55
54
struct DtmTransStatus * next ;/* pointer to next element in finished
@@ -292,8 +291,7 @@ DtmAdjustSubtransactions(DtmTransStatus *ts)
292
291
293
292
for (i = 0 ;i < nSubxids ;i ++ ) {
294
293
sts = sts -> next ;
295
- sts -> status = ts -> status ;
296
- Assert (sts -> cid == ts -> cid );
294
+ sts -> cid = ts -> cid ;
297
295
}
298
296
}
299
297
@@ -380,14 +378,14 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
380
378
381
379
if (ts != NULL )
382
380
{
383
- if (ts -> cid > dtm_tx .snapshot )
381
+ if (GlobalCSNIsNormal ( ts -> cid ) && ts -> cid > dtm_tx .snapshot )
384
382
{
385
383
DTM_TRACE ((stderr ,"%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n" ,
386
384
getpid (),xid ,ts -> cid ,dtm_tx .snapshot ));
387
385
SpinLockRelease (& local -> lock );
388
386
return true;
389
387
}
390
- if (ts -> status == TRANSACTION_STATUS_UNKNOWN )
388
+ if (ts -> cid == InDoubtGlobalCSN )
391
389
{
392
390
DTM_TRACE ((stderr ,"%d: wait for in-doubt transaction %u in snapshot %lu\n" ,getpid (),xid ,dtm_tx .snapshot ));
393
391
SpinLockRelease (& local -> lock );
@@ -400,7 +398,10 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
400
398
}
401
399
else
402
400
{
403
- bool invisible = ts -> status == TRANSACTION_STATUS_ABORTED ;
401
+ bool invisible = ts -> cid == AbortedGlobalCSN ;
402
+
403
+ if (!invisible )
404
+ Assert (GlobalCSNIsNormal (ts -> cid ));
404
405
405
406
DTM_TRACE ((stderr ,"%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n" ,
406
407
getpid (),xid ,ts -> cid ,invisible ?"rollbacked" :"committed" ,dtm_tx .snapshot ));
@@ -473,6 +474,9 @@ DtmLocalExtend(GlobalTransactionId gtid)
473
474
strncpy (x -> gtid ,gtid ,MAX_GTID_SIZE );
474
475
}
475
476
DtmInitGlobalXmin (TransactionXmin );
477
+
478
+ dtm_tx .is_global = true;
479
+
476
480
return x -> snapshot ;
477
481
}
478
482
@@ -495,6 +499,8 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
495
499
strncpy (x -> gtid ,gtid ,MAX_GTID_SIZE );
496
500
SpinLockRelease (& local -> lock );
497
501
502
+ dtm_tx .is_global = true;
503
+
498
504
if (global_cid < local_cid - DtmVacuumDelay * USEC )
499
505
{
500
506
elog (ERROR ,"Too old snapshot: requested %ld, current %ld" ,global_cid ,local_cid );
@@ -511,27 +517,34 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
511
517
void
512
518
DtmLocalBeginPrepare (GlobalTransactionId gtid )
513
519
{
514
- TransactionId xid = TwoPhaseGetTransactionId (gtid );
515
-
516
- if (!TransactionIdIsValid (xid ))
517
- {
518
- // XXX: check that it is global tx with the same xid, XactTopTransactionId?
519
- xid = GetCurrentTransactionId ();
520
- }
520
+ TransactionId xid = GetCurrentTransactionIdIfAny ();
521
521
522
- SpinLockAcquire ( & local -> lock );
522
+ if ( TransactionIdIsValid ( xid )) // XXX: decide based on empty gtid?
523
523
{
524
+ // inside global 1pc tx
525
+ TransactionId * subxids ;
526
+ int nSubxids = xactGetCommittedChildren (& subxids );
524
527
DtmTransStatus * ts ;
525
528
bool found ;
526
529
527
- ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
528
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
529
- ts -> cid = dtm_get_cid ();
530
- if (!found )
531
- ts -> nSubxids = 0 ;
532
- DtmAdjustSubtransactions (ts );
530
+ Assert (dtm_tx .is_global );// XXX: change to error
531
+
532
+ SpinLockAcquire (& local -> lock );
533
+ {
534
+ ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
535
+ Assert (!found );
536
+ ts -> cid = InDoubtGlobalCSN ;
537
+ ts -> nSubxids = nSubxids ;
538
+ DtmTransactionListAppend (ts );
539
+ DtmAddSubtransactions (ts ,subxids ,nSubxids );
540
+ }
541
+ SpinLockRelease (& local -> lock );
533
542
}
534
- SpinLockRelease (& local -> lock );
543
+ else
544
+ {
545
+ // inside after-prepare fx
546
+ }
547
+
535
548
}
536
549
537
550
/*
@@ -559,31 +572,24 @@ DtmLocalPrepare(GlobalTransactionId gtid, cid_t global_cid)
559
572
void
560
573
DtmLocalEndPrepare (GlobalTransactionId gtid ,cid_t cid )
561
574
{
562
- TransactionId xid = TwoPhaseGetTransactionId ( gtid );
575
+ TransactionId xid = GetCurrentTransactionIdIfAny ( );
563
576
564
- if (! TransactionIdIsValid (xid ))
577
+ if (TransactionIdIsValid (xid ))
565
578
{
566
- // XXX: check that it is global tx with the same xid, XactTopTransactionId?
567
- xid = GetCurrentTransactionId ();
579
+ Assert (dtm_tx .is_global );
568
580
}
569
-
570
- dtm_tx .xid = xid ;
571
-
572
- SpinLockAcquire (& local -> lock );
581
+ else
573
582
{
574
- DtmTransStatus * ts ;
575
- DtmTransId * id ;
576
- int i ;
583
+ // inside after-prepare fx
584
+ xid = TwoPhaseGetTransactionId (gtid );
585
+ // Assert(TransactionIdIsValid(xid));
586
+ if (!TransactionIdIsValid (xid ))
587
+ return ;// global ro tx
588
+ }
577
589
578
- ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_FIND ,NULL );
579
- Assert (ts != NULL );
580
- ts -> cid = cid ;
581
- DtmAdjustSubtransactions (ts );
582
- dtm_sync (cid );
583
590
584
- DTM_TRACE ((stderr ,"Prepare transaction %u(%s) with CSN %lu\n" ,id -> xid ,gtid ,cid ));
585
- }
586
- SpinLockRelease (& local -> lock );
591
+ dtm_tx .xid = xid ;
592
+ dtm_tx .csn = cid ;
587
593
588
594
}
589
595
@@ -598,7 +604,7 @@ DtmLocalFinish(bool is_commit)
598
604
599
605
if (x -> gtid [0 ]&& finishing_prepared )
600
606
{
601
- xid = x -> xid ;
607
+ xid = dtm_tx . xid ;
602
608
}
603
609
else if (!TransactionIdIsValid (xid ))
604
610
{
@@ -611,19 +617,25 @@ DtmLocalFinish(bool is_commit)
611
617
DtmTransStatus * ts ;
612
618
613
619
ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
614
- ts -> status = is_commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED ;
615
620
616
- if (!found )
621
+ if (found )
622
+ {
623
+ Assert (GlobalCSNIsNormal (dtm_tx .csn ));
624
+ ts -> cid = is_commit ?dtm_tx .csn :AbortedGlobalCSN ;
625
+
626
+ dtm_tx .xid = InvalidTransactionId ;
627
+ dtm_tx .csn = InvalidGlobalCSN ;
628
+ }
629
+ else
617
630
{
618
- ts -> cid = dtm_get_cid ( );
619
- ts -> nSubxids = 0 ;
631
+ Assert (! GlobalCSNIsNormal ( dtm_tx . csn ) );
632
+ ts -> cid = is_commit ? dtm_get_cid () : AbortedGlobalCSN ;
620
633
DtmTransactionListAppend (ts );
621
634
}
622
635
DtmAdjustSubtransactions (ts );
623
636
}
624
637
SpinLockRelease (& local -> lock );
625
638
626
- // DtmAdjustOldestXid();
627
639
}
628
640
629
641
/*
@@ -657,32 +669,14 @@ DtmDeserializeTransactionState(void* ctx)
657
669
}
658
670
659
671
660
- cid_t
661
- DtmGetCsn (TransactionId xid )
662
- {
663
- cid_t csn = 0 ;
664
-
665
- SpinLockAcquire (& local -> lock );
666
- {
667
- DtmTransStatus * ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_FIND ,NULL );
668
-
669
- if (ts != NULL )
670
- {
671
- csn = ts -> cid ;
672
- }
673
- }
674
- SpinLockRelease (& local -> lock );
675
- return csn ;
676
- }
677
-
678
672
/*
679
673
* Save state of parepared transaction
680
674
*/
681
675
void
682
676
DtmLocalSavePreparedState (DtmCurrentTrans * x )
683
677
{
684
678
685
- if (x -> gtid [ 0 ] )
679
+ if (dtm_tx . is_global )
686
680
{
687
681
TransactionId * subxids ;
688
682
TransactionId xid = GetCurrentTransactionId ();
@@ -691,10 +685,11 @@ DtmLocalSavePreparedState(DtmCurrentTrans * x)
691
685
SpinLockAcquire (& local -> lock );
692
686
{
693
687
DtmTransStatus * ts ;
688
+ bool found ;
694
689
695
- ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,NULL );
696
- ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
697
- ts -> cid = dtm_get_cid () ;
690
+ ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
691
+ Assert (! found ) ;
692
+ ts -> cid = InDoubtGlobalCSN ;
698
693
ts -> nSubxids = nSubxids ;
699
694
DtmTransactionListAppend (ts );
700
695
DtmAddSubtransactions (ts ,subxids ,nSubxids );
@@ -720,7 +715,6 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
720
715
Assert (TransactionIdIsValid (subxids [i ]));
721
716
sts = (DtmTransStatus * )hash_search (xid2status ,& subxids [i ],HASH_ENTER ,& found );
722
717
Assert (!found );
723
- sts -> status = ts -> status ;
724
718
sts -> cid = ts -> cid ;
725
719
sts -> nSubxids = 0 ;
726
720
DtmTransactionListInsertAfter (ts ,sts );