@@ -89,7 +89,8 @@ static HTAB *gtid2xid;
89
89
static DtmNodeState * local ;
90
90
static uint64 totalSleepInterrupts ;
91
91
static int DtmVacuumDelay = 10 ;/* sec */
92
- static bool DtmRecordCommits = 0 ;
92
+ static bool finishing_prepared ;
93
+
93
94
94
95
DtmCurrentTrans dtm_tx ;// XXXX: make static
95
96
@@ -103,6 +104,7 @@ static size_t DtmGetTransactionStateSize(void);
103
104
static void DtmSerializeTransactionState (void * ctx );
104
105
static void DtmDeserializeTransactionState (void * ctx );
105
106
107
+ static void DtmLocalFinish (bool is_commit );
106
108
107
109
static TransactionManager DtmTM = {
108
110
PgTransactionIdGetStatus ,
@@ -207,7 +209,8 @@ GlobalSnapshotShmemSize(void)
207
209
Size size ;
208
210
209
211
size = MAXALIGN (sizeof (DtmNodeState ));
210
- size = add_size (size , (sizeof (DtmTransId )+ sizeof (DtmTransStatus )+ HASH_PER_ELEM_OVERHEAD * 2 )* DTM_HASH_INIT_SIZE );
212
+ size = add_size (size ,DTM_HASH_INIT_SIZE *
213
+ (sizeof (DtmTransId )+ sizeof (DtmTransStatus )+ HASH_PER_ELEM_OVERHEAD * 2 ));
211
214
212
215
return size ;
213
216
}
@@ -222,22 +225,28 @@ dtm_xact_callback(XactEvent event, void *arg)
222
225
DtmLocalBegin (& dtm_tx );
223
226
break ;
224
227
225
- case XACT_EVENT_ABORT :
226
- DtmLocalAbort (& dtm_tx );
227
- DtmLocalEnd (& dtm_tx );
228
+ case XACT_EVENT_ABORT_PREPARED :
229
+ // DtmLocalAbortPrepared(&dtm_tx);
230
+ finishing_prepared = true;
231
+ DtmAdjustOldestXid ();
228
232
break ;
229
233
230
- case XACT_EVENT_COMMIT :
231
- DtmLocalCommit (& dtm_tx );
232
- DtmLocalEnd (& dtm_tx );
234
+ case XACT_EVENT_COMMIT_PREPARED :
235
+ // DtmLocalCommitPrepared(&dtm_tx);
236
+ finishing_prepared = true;
237
+ DtmAdjustOldestXid ();
233
238
break ;
234
239
235
- case XACT_EVENT_ABORT_PREPARED :
236
- DtmLocalAbortPrepared (& dtm_tx );
240
+ case XACT_EVENT_COMMIT :
241
+ DtmLocalFinish (true);
242
+ DtmLocalEnd (& dtm_tx );
243
+ finishing_prepared = false;
237
244
break ;
238
245
239
- case XACT_EVENT_COMMIT_PREPARED :
240
- DtmLocalCommitPrepared (& dtm_tx );
246
+ case XACT_EVENT_ABORT :
247
+ DtmLocalFinish (false);
248
+ DtmLocalEnd (& dtm_tx );
249
+ finishing_prepared = false;
241
250
break ;
242
251
243
252
case XACT_EVENT_PRE_PREPARE :
@@ -254,43 +263,6 @@ dtm_xact_callback(XactEvent event, void *arg)
254
263
****************************************************************************
255
264
*/
256
265
257
- static uint32
258
- dtm_xid_hash_fn (const void * key ,Size keysize )
259
- {
260
- return (uint32 )* (TransactionId * )key ;
261
- }
262
-
263
- static int
264
- dtm_xid_match_fn (const void * key1 ,const void * key2 ,Size keysize )
265
- {
266
- return * (TransactionId * )key1 - * (TransactionId * )key2 ;
267
- }
268
-
269
- static uint32
270
- dtm_gtid_hash_fn (const void * key ,Size keysize )
271
- {
272
- GlobalTransactionId id = (GlobalTransactionId )key ;
273
- uint32 h = 0 ;
274
-
275
- while (* id != 0 )
276
- {
277
- h = h * 31 + * id ++ ;
278
- }
279
- return h ;
280
- }
281
-
282
- static void *
283
- dtm_gtid_keycopy_fn (void * dest ,const void * src ,Size keysize )
284
- {
285
- return strcpy ((char * )dest , (GlobalTransactionId )src );
286
- }
287
-
288
- static int
289
- dtm_gtid_match_fn (const void * key1 ,const void * key2 ,Size keysize )
290
- {
291
- return strcmp ((GlobalTransactionId )key1 , (GlobalTransactionId )key2 );
292
- }
293
-
294
266
static char const *
295
267
DtmGetName (void )
296
268
{
@@ -480,17 +452,11 @@ DtmInitialize()
480
452
void
481
453
DtmLocalBegin (DtmCurrentTrans * x )
482
454
{
483
- if (!TransactionIdIsValid (x -> xid ))
484
- {
485
455
SpinLockAcquire (& local -> lock );
486
- // x->xid = GetCurrentTransactionIdIfAny();
487
456
x -> cid = INVALID_CID ;
488
- x -> is_global = false;
489
- x -> is_prepared = false;
490
457
x -> snapshot = dtm_get_cid ();
491
458
SpinLockRelease (& local -> lock );
492
459
DTM_TRACE ((stderr ,"DtmLocalBegin: transaction %u uses local snapshot %lu\n" ,x -> xid ,x -> snapshot ));
493
- }
494
460
}
495
461
496
462
/*
@@ -516,7 +482,6 @@ DtmLocalExtend(GlobalTransactionId gtid)
516
482
strncpy (x -> gtid ,gtid ,MAX_GTID_SIZE );
517
483
SpinLockRelease (& local -> lock );
518
484
}
519
- x -> is_global = true;
520
485
DtmInitGlobalXmin (TransactionXmin );
521
486
return x -> snapshot ;
522
487
}
@@ -543,7 +508,6 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
543
508
}
544
509
local_cid = dtm_sync (global_cid );
545
510
x -> snapshot = global_cid ;
546
- x -> is_global = true;
547
511
}
548
512
strncpy (x -> gtid ,gtid ,MAX_GTID_SIZE );
549
513
SpinLockRelease (& local -> lock );
@@ -627,85 +591,59 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
627
591
}
628
592
SpinLockRelease (& local -> lock );
629
593
630
- /*
631
- * Record commit in pg_committed_xact table to be make it possible to
632
- * perform recovery in case of crash of some of cluster nodes
633
- */
634
- if (DtmRecordCommits )
635
- {
636
- char stmt [MAX_GTID_SIZE + 64 ];
637
- int rc ;
638
-
639
- sprintf (stmt ,"insert into pg_committed_xacts values ('%s')" ,gtid );
640
- SPI_connect ();
641
- rc = SPI_execute (stmt , true,0 );
642
- SPI_finish ();
643
- if (rc != SPI_OK_INSERT )
644
- {
645
- elog (ERROR ,"Failed to insert GTID %s in table pg_committed_xacts" ,gtid );
646
- }
647
- }
648
594
}
649
595
650
596
/*
651
- *Mark tranasction as prepared
597
+ *Set transaction status to committed
652
598
*/
653
599
void
654
- DtmLocalCommitPrepared ( DtmCurrentTrans * x )
600
+ DtmLocalFinish ( bool is_commit )
655
601
{
656
- if (!x -> gtid [0 ])
657
- return ;
658
-
659
- Assert (x -> gtid != NULL );
602
+ DtmCurrentTrans * x = & dtm_tx ;
603
+ TransactionId xid = GetCurrentTransactionIdIfAny ();
660
604
661
- SpinLockAcquire ( & local -> lock );
605
+ if ( x -> gtid [ 0 ] && finishing_prepared )
662
606
{
663
- DtmTransId * id = ( DtmTransId * ) hash_search ( gtid2xid , x -> gtid , HASH_REMOVE , NULL );
607
+ // Assert(!TransactionIdIsValid(xid) );
664
608
665
- Assert (id != NULL );
609
+ SpinLockAcquire (& local -> lock );
610
+ {
611
+ DtmTransId * id = (DtmTransId * )hash_search (gtid2xid ,x -> gtid ,HASH_REMOVE ,NULL );
666
612
667
- x -> is_global = true;
668
- x -> is_prepared = true;
669
- x -> xid = id -> xid ;
670
- free (id -> subxids );
613
+ Assert (id != NULL );
614
+ Assert (TransactionIdIsValid (id -> xid ));
671
615
672
- DTM_TRACE ((stderr ,"Global transaction %u(%s) is precommitted\n" ,x -> xid ,gtid ));
616
+ xid = id -> xid ;
617
+ free (id -> subxids );
618
+ }
619
+ SpinLockRelease (& local -> lock );
620
+ }
621
+ else if (!TransactionIdIsValid (xid ))
622
+ {
623
+ return ;
673
624
}
674
- SpinLockRelease (& local -> lock );
675
-
676
- DtmAdjustOldestXid ();
677
- // elog(LOG, "DtmLocalCommitPrepared %d", x->xid);
678
- }
679
-
680
- /*
681
- * Set transaction status to committed
682
- */
683
- void
684
- DtmLocalCommit (DtmCurrentTrans * x )
685
- {
686
- // if (!x->is_global)
687
- // return;
688
625
689
626
SpinLockAcquire (& local -> lock );
690
- if (TransactionIdIsValid (x -> xid ))
691
627
{
692
628
bool found ;
693
629
DtmTransStatus * ts ;
694
630
695
- ts = (DtmTransStatus * )hash_search (xid2status ,& x -> xid ,HASH_ENTER ,& found );
696
- ts -> status = TRANSACTION_STATUS_COMMITTED ;
631
+ ts = (DtmTransStatus * )hash_search (xid2status ,& xid ,HASH_ENTER ,& found );
632
+ ts -> status = is_commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED ;
697
633
if (found )
698
634
{
699
- int i ;
700
- DtmTransStatus * sts = ts ;
701
635
702
- Assert (found );
703
- Assert (x -> is_global );
704
- for (i = 0 ;i < ts -> nSubxids ;i ++ )
636
+ if (is_commit )// XXX: why only for commit?
705
637
{
706
- sts = sts -> next ;
707
- Assert (sts -> cid == ts -> cid );
708
- sts -> status = TRANSACTION_STATUS_COMMITTED ;
638
+ int i ;
639
+ DtmTransStatus * sts = ts ;
640
+
641
+ for (i = 0 ;i < ts -> nSubxids ;i ++ )
642
+ {
643
+ sts = sts -> next ;
644
+ Assert (sts -> cid == ts -> cid );
645
+ sts -> status = TRANSACTION_STATUS_COMMITTED ;
646
+ }
709
647
}
710
648
}
711
649
else
@@ -715,86 +653,31 @@ DtmLocalCommit(DtmCurrentTrans * x)
715
653
Assert (!found );
716
654
ts -> cid = dtm_get_cid ();
717
655
DtmTransactionListAppend (ts );
718
- ts -> nSubxids = xactGetCommittedChildren (& subxids );
719
- DtmAddSubtransactions (ts ,subxids ,ts -> nSubxids );
656
+ if (is_commit )// XXX: why?
657
+ {
658
+ ts -> nSubxids = xactGetCommittedChildren (& subxids );
659
+ DtmAddSubtransactions (ts ,subxids ,ts -> nSubxids );
660
+ }
661
+ else
662
+ {
663
+ ts -> nSubxids = 0 ;
664
+ }
720
665
}
721
666
x -> cid = ts -> cid ;
722
667
DTM_TRACE ((stderr ,"Local transaction %u is committed at %lu\n" ,x -> xid ,x -> cid ));
723
668
}
724
669
SpinLockRelease (& local -> lock );
725
670
726
- DtmAdjustOldestXid ();
671
+ // DtmAdjustOldestXid();
727
672
// elog(LOG, "DtmLocalCommit %d", x->xid);
728
673
}
729
674
730
- /*
731
- * Mark tranasction as prepared
732
- */
733
- void
734
- DtmLocalAbortPrepared (DtmCurrentTrans * x )
735
- {
736
- if (!x -> gtid [0 ])
737
- return ;
738
-
739
- Assert (x -> gtid != NULL );
740
-
741
- SpinLockAcquire (& local -> lock );
742
- {
743
- DtmTransId * id = (DtmTransId * )hash_search (gtid2xid ,x -> gtid ,HASH_REMOVE ,NULL );
744
-
745
- Assert (id != NULL );
746
- x -> is_global = true;
747
- x -> is_prepared = true;
748
- x -> xid = id -> xid ;
749
- free (id -> subxids );
750
- DTM_TRACE ((stderr ,"Global transaction %u(%s) is preaborted\n" ,x -> xid ,gtid ));
751
- }
752
- SpinLockRelease (& local -> lock );
753
- }
754
-
755
- /*
756
- * Set transaction status to aborted
757
- */
758
- void
759
- DtmLocalAbort (DtmCurrentTrans * x )
760
- {
761
- if (!TransactionIdIsValid (x -> xid ))
762
- return ;
763
-
764
- SpinLockAcquire (& local -> lock );
765
- {
766
- bool found ;
767
- DtmTransStatus * ts ;
768
-
769
- Assert (TransactionIdIsValid (x -> xid ));
770
- ts = (DtmTransStatus * )hash_search (xid2status ,& x -> xid ,HASH_ENTER ,& found );
771
- if (found )
772
- {
773
- Assert (found );
774
- Assert (x -> is_global );
775
- }
776
- else
777
- {
778
- Assert (!found );
779
- ts -> cid = dtm_get_cid ();
780
- ts -> nSubxids = 0 ;
781
- DtmTransactionListAppend (ts );
782
- }
783
- x -> cid = ts -> cid ;
784
- ts -> status = TRANSACTION_STATUS_ABORTED ;
785
- DTM_TRACE ((stderr ,"Local transaction %u is aborted at %lu\n" ,x -> xid ,x -> cid ));
786
- }
787
- SpinLockRelease (& local -> lock );
788
- }
789
-
790
675
/*
791
676
* Cleanup dtm_tx structure
792
677
*/
793
678
void
794
679
DtmLocalEnd (DtmCurrentTrans * x )
795
680
{
796
- x -> is_global = false;
797
- x -> is_prepared = false;
798
681
x -> xid = InvalidTransactionId ;
799
682
x -> cid = INVALID_CID ;
800
683
}
@@ -854,7 +737,6 @@ DtmGetCsn(TransactionId xid)
854
737
void
855
738
DtmLocalSavePreparedState (DtmCurrentTrans * x )
856
739
{
857
- // x->is_prepared = true;
858
740
859
741
if (x -> gtid [0 ])
860
742
{