@@ -230,14 +230,14 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
230230if (ts != NULL )
231231 {
232232if (ts -> csn > dtmTx .snapshot ) {
233- DTM_TRACE (( stderr , "%d: tuple with xid=%d(csn=%lld ) is invisibile in snapshot %lld \n" ,
234- getpid (),xid ,ts -> csn ,dtmTx .snapshot ) );
233+ DTM_TUPLE_TRACE ( "%d: tuple with xid=%d(csn=%ld ) is invisibile in snapshot %ld \n" ,
234+ getpid (),xid ,ts -> csn ,dtmTx .snapshot );
235235LWLockRelease (dtm -> hashLock );
236236return true;
237237 }
238238if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS )
239239 {
240- DTM_TRACE (( stderr , "%d: wait for in-doubt transaction %u in snapshot %lu\n" ,getpid (),xid ,dtmTx .snapshot ) );
240+ DTM_TRACE ("%d: wait for in-doubt transaction %u in snapshot %lu\n" ,getpid (),xid ,dtmTx .snapshot );
241241LWLockRelease (dtm -> hashLock );
242242#if TRACE_SLEEP_TIME
243243 {
@@ -255,7 +255,7 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
255255if (firstReportTime == 0 ) {
256256firstReportTime = now ;
257257 }else {
258- fprintf ( stderr , "Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n" ,totalSleepTime ,now - firstReportTime ,totalSleepTime * 100.0 /(now - firstReportTime ),maxSleepTime );
258+ DTM_TRACE ( "Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n" ,totalSleepTime ,now - firstReportTime ,totalSleepTime * 100.0 /(now - firstReportTime ),maxSleepTime );
259259 }
260260 }
261261 }
@@ -268,15 +268,15 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
268268else
269269 {
270270bool invisible = ts -> status != TRANSACTION_STATUS_COMMITTED ;
271- DTM_TRACE (( stderr , "%d: tuple with xid=%d(csn= %lld ) is %s in snapshot %lld \n" ,
272- getpid (),xid ,ts -> csn ,invisible ?"rollbacked" :"committed" ,dtmTx .snapshot ) );
271+ DTM_TUPLE_TRACE ( "%d: tuple with xid=%d(csn= %ld ) is %s in snapshot %ld \n" ,
272+ getpid (),xid ,ts -> csn ,invisible ?"rollbacked" :"committed" ,dtmTx .snapshot );
273273LWLockRelease (dtm -> hashLock );
274274return invisible ;
275275 }
276276 }
277277else
278278 {
279- DTM_TRACE (( stderr , "%d: visibility check is skept for transaction %u in snapshot %lu\n" ,getpid (),xid ,dtmTx .snapshot ) );
279+ DTM_TUPLE_TRACE ( "%d: visibility check is skept for transaction %u in snapshot %lu\n" ,getpid (),xid ,dtmTx .snapshot );
280280break ;
281281 }
282282 }
@@ -342,14 +342,15 @@ DtmAdjustOldestXid(TransactionId xid)
342342ts = (DtmTransState * )hash_search (xid2state ,& xid ,HASH_FIND ,NULL );
343343if (ts != NULL ) {
344344timestamp_t cutoff_time = ts -> csn - DtmVacuumDelay * USEC ;
345-
345+ #if 0
346346for (ts = dtm -> transListHead ;ts != NULL && ts -> csn < cutoff_time ;prev = ts ,ts = ts -> next ) {
347347Assert (ts -> status == TRANSACTION_STATUS_COMMITTED || ts -> status == TRANSACTION_STATUS_ABORTED );
348348if (prev != NULL ) {
349349/* Remove information about too old transactions */
350350hash_search (xid2state ,& prev -> xid ,HASH_REMOVE ,NULL );
351351}
352352}
353+ #endif
353354 }
354355if (prev != NULL ) {
355356dtm -> transListHead = prev ;
@@ -398,7 +399,6 @@ static void DtmInitialize()
398399static void
399400DtmXactCallback (XactEvent event ,void * arg )
400401{
401- //XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n", getpid(), event, DtmNextXid);
402402switch (event )
403403 {
404404case XACT_EVENT_START :
@@ -427,7 +427,7 @@ DtmBeginTransaction(DtmCurrentTrans* x)
427427x -> snapshot = dtm_get_csn ();
428428x -> gtid .xid = InvalidTransactionId ;
429429LWLockRelease (dtm -> hashLock );
430- DTM_TRACE (( stderr , "DtmLocalTransaction: transaction %u uses local snapshot %lu\n" ,x -> xid ,x -> snapshot ) );
430+ DTM_TRACE ("DtmLocalTransaction: transaction %u uses local snapshot %lu\n" ,x -> xid ,x -> snapshot );
431431 }
432432}
433433
@@ -438,6 +438,7 @@ DtmBeginTransaction(DtmCurrentTrans* x)
438438static void DtmPrepareTransaction (DtmCurrentTrans * x )
439439{
440440DtmTransState * ts ;
441+ bool found ;
441442int i ;
442443
443444if (!x -> isDistributed ) {
@@ -448,8 +449,9 @@ static void DtmPrepareTransaction(DtmCurrentTrans* x)
448449x -> xid = GetCurrentTransactionId ();
449450}
450451LWLockAcquire (dtm -> hashLock ,LW_EXCLUSIVE );
451- ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,NULL );
452- ts -> snapshot = x -> isReplicated ?x -> snapshot :INVALID_CSN ;
452+ ts = hash_search (xid2state ,& x -> xid ,HASH_ENTER ,& found );
453+ Assert (!found );
454+ ts -> snapshot = x -> isReplicated ?INVALID_CSN :x -> snapshot ;
453455ts -> status = TRANSACTION_STATUS_UNKNOWN ;
454456ts -> csn = dtm_get_csn ();
455457ts -> procno = MyProc -> pgprocno ;
@@ -475,6 +477,24 @@ DtmEndTransaction(DtmCurrentTrans* x)
475477x -> gtid .xid = InvalidTransactionId ;
476478}
477479
480+ static void
481+ SendNotificationMessage (DtmTransState * ts )
482+ {
483+ DtmTransState * votingList ;
484+
485+ SpinLockAcquire (& dtm -> votingSpinlock );
486+ votingList = dtm -> votingTransactions ;
487+ ts -> nextVoting = votingList ;
488+ dtm -> votingTransactions = ts ;
489+ SpinLockRelease (& dtm -> votingSpinlock );
490+ DTM_TRACE ("Register commit message\n" );
491+ if (votingList == NULL ) {
492+ /* singal semaphore only once for the whole list */
493+ DTM_TRACE ("Signal semaphore\n" );
494+ PGSemaphoreUnlock (& dtm -> votingSemaphore );
495+ }
496+ }
497+
478498static XidStatus
479499DtmCommitTransaction (TransactionId xid ,int nsubxids ,TransactionId * subxids )
480500{
@@ -524,12 +544,16 @@ DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
524544
525545LWLockAcquire (dtm -> hashLock ,LW_EXCLUSIVE );
526546ts = hash_search (xid2state ,& xid ,HASH_FIND ,NULL );
527- Assert (ts != NULL );/* should be created by DtmPrepareTransaction */
528- ts -> status = status ;
529- for (i = 0 ;i < nsubxids ;i ++ ) {
530- ts = ts -> next ;
547+ if (ts != NULL ) {/* should be created by DtmPrepareTransaction */
531548ts -> status = status ;
532- }
549+ for (i = 0 ;i < nsubxids ;i ++ ) {
550+ ts = ts -> next ;
551+ ts -> status = status ;
552+ }
553+ if (dtmTx .isReplicated ) {
554+ SendNotificationMessage (ts );
555+ }
556+ }
533557LWLockRelease (dtm -> hashLock );
534558}
535559
@@ -538,19 +562,18 @@ DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
538562static void
539563DtmSetTransactionStatus (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status ,XLogRecPtr lsn )
540564{
541- DTM_INFO ("%d: DtmSetTransactionStatus %u = %u\n" ,getpid (),xid ,status );
542- if (dtmTx .isDistributed )
565+ DTM_TRACE ("%d: DtmSetTransactionStatus %u = %u\n" ,getpid (),xid ,status );
566+ if (xid == dtmTx . xid && dtmTx .isDistributed )
543567{
544- Assert (xid == dtmTx .xid );
545568if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML )
546569{
547570DtmFinishTransaction (xid ,nsubxids ,subxids ,status );
548- DTM_INFO ("Abort transaction %d\n" ,xid );
571+ DTM_TRACE ("Abort transaction %d\n" ,xid );
549572}
550573else
551574{
552575if (DtmCommitTransaction (xid ,nsubxids ,subxids )== TRANSACTION_STATUS_COMMITTED ) {
553- DTM_INFO ("Commit transaction %d\n" ,xid );
576+ DTM_TRACE ("Commit transaction %d\n" ,xid );
554577}else {
555578PgTransactionIdSetTreeStatus (xid ,nsubxids ,subxids ,TRANSACTION_STATUS_ABORTED ,lsn );
556579dtmTx .isDistributed = false;
@@ -643,7 +666,7 @@ _PG_init(void)
643666);
644667
645668DefineCustomIntVariable (
646- "multimaster.arpiter_port " ,
669+ "multimaster.arbiter_port " ,
647670"Base value for assigning arbiter ports" ,
648671NULL ,
649672& MMArbiterPort ,
@@ -990,42 +1013,29 @@ MMPoolConstructor(void)
9901013return & dtm -> pool ;
9911014}
9921015
993- static void
994- SendCommitMessage (DtmTransState * ts )
995- {
996- DtmTransState * votingList ;
997-
998- SpinLockAcquire (& dtm -> votingSpinlock );
999- votingList = dtm -> votingTransactions ;
1000- ts -> nextVoting = votingList ;
1001- dtm -> votingTransactions = ts ;
1002- SpinLockRelease (& dtm -> votingSpinlock );
1003-
1004- if (votingList == NULL ) {
1005- /* singal semaphreo only once for the whole list */
1006- PGSemaphoreUnlock (& dtm -> votingSemaphore );
1007- }
1008- }
1009-
10101016static void
10111017MMVoteForTransaction (DtmTransState * ts )
10121018{
10131019LWLockRelease (dtm -> hashLock );
10141020if (ts -> gtid .node == MMNodeId ) {
10151021/* I am coordinator: wait responses from all replicas for transaction replicated using logical decoding */
1022+ DTM_TRACE ("Coordinator waiting latch...\n" );
10161023WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
10171024ResetLatch (& MyProc -> procLatch );
1025+ DTM_TRACE ("Coordinator receive %d votes\n" ,ts -> nVotes );
10181026Assert (ts -> nVotes == dtm -> nNodes );
10191027
10201028/* ... and then send notifications to replicas */
1021- SendCommitMessage (ts );
1029+ SendNotificationMessage (ts );
10221030}else {
10231031/* I am replica: first notify coordinator... */
10241032ts -> nVotes = dtm -> nNodes - 1 ;/* I just need one confirmation from coordinator */
1025- SendCommitMessage (ts );
1033+ SendNotificationMessage (ts );
10261034/* ... and wait response from it */
1035+ DTM_TRACE ("Node %d waiting latch...\n" ,MMNodeId );
10271036WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
10281037ResetLatch (& MyProc -> procLatch );
1038+ DTM_TRACE ("Node %d receive response...\n" ,MMNodeId );
10291039}
10301040LWLockAcquire (dtm -> hashLock ,LW_EXCLUSIVE );
10311041}
@@ -1034,6 +1044,7 @@ HTAB* MMCreateHash(void)
10341044{
10351045HASHCTL info ;
10361046HTAB * htab ;
1047+ Assert (MMNodes > 0 );
10371048memset (& info ,0 ,sizeof (info ));
10381049info .keysize = sizeof (TransactionId );
10391050info .entrysize = sizeof (DtmTransState )+ (MMNodes - 1 )* sizeof (TransactionId );