@@ -103,6 +103,7 @@ PG_MODULE_MAGIC;
103
103
PG_FUNCTION_INFO_V1 (mtm_start_replication );
104
104
PG_FUNCTION_INFO_V1 (mtm_stop_replication );
105
105
PG_FUNCTION_INFO_V1 (mtm_drop_node );
106
+ PG_FUNCTION_INFO_V1 (mtm_poll_node );
106
107
PG_FUNCTION_INFO_V1 (mtm_recover_node );
107
108
PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
108
109
PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
@@ -181,6 +182,7 @@ int MtmKeepaliveTimeout;
181
182
int MtmReconnectAttempts ;
182
183
int MtmNodeDisableDelay ;
183
184
bool MtmUseRaftable ;
185
+ bool MtmUseDtm ;
184
186
MtmConnectionInfo * MtmConnections ;
185
187
186
188
static char * MtmConnStrs ;
@@ -339,7 +341,7 @@ TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum)
339
341
}
340
342
341
343
bool MtmXidInMVCCSnapshot (TransactionId xid ,Snapshot snapshot )
342
- {
344
+ {
343
345
#if TRACE_SLEEP_TIME
344
346
static timestamp_t firstReportTime ;
345
347
static timestamp_t prevReportTime ;
@@ -349,6 +351,10 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
349
351
timestamp_t delay = MIN_WAIT_TIMEOUT ;
350
352
Assert (xid != InvalidTransactionId );
351
353
354
+ if (!MtmUseDtm ) {
355
+ return PgXidInMVCCSnapshot (xid ,snapshot );
356
+ }
357
+
352
358
MtmLock (LW_SHARED );
353
359
354
360
#if TRACE_SLEEP_TIME
@@ -512,13 +518,19 @@ MtmAdjustOldestXid(TransactionId xid)
512
518
hash_search (MtmXid2State ,& prev -> xid ,HASH_REMOVE ,NULL );
513
519
}
514
520
}
515
- }
516
- if (prev != NULL ) {
517
- Mtm -> transListHead = prev ;
518
- Mtm -> oldestXid = xid = prev -> xid ;
519
- }else if (TransactionIdPrecedes (Mtm -> oldestXid ,xid )) {
520
- xid = Mtm -> oldestXid ;
521
- }
521
+ }
522
+ if (MtmUseDtm ) {
523
+ if (prev != NULL ) {
524
+ Mtm -> transListHead = prev ;
525
+ Mtm -> oldestXid = xid = prev -> xid ;
526
+ }else if (TransactionIdPrecedes (Mtm -> oldestXid ,xid )) {
527
+ xid = Mtm -> oldestXid ;
528
+ }
529
+ }else {
530
+ if (prev != NULL ) {
531
+ Mtm -> transListHead = prev ;
532
+ }
533
+ }
522
534
MtmUnlock ();
523
535
}
524
536
return xid ;
@@ -753,6 +765,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
753
765
754
766
}
755
767
768
+ static time_t maxWakeupTime ;
769
+
756
770
static void
757
771
MtmPostPrepareTransaction (MtmCurrentTrans * x )
758
772
{
@@ -768,18 +782,23 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
768
782
tm -> state = ts ;
769
783
ts -> votingCompleted = true;
770
784
if (Mtm -> status != MTM_RECOVERY ) {
771
- MtmSendNotificationMessage (ts ,MSG_READY );/* send notification to coordinator */
785
+ MtmSendNotificationMessage (ts ,MtmUseDtm ? MSG_READY : MSG_PREPARED );/* send notification to coordinator */
772
786
}else {
773
787
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
774
788
}
775
789
MtmUnlock ();
776
790
MtmResetTransaction (x );
777
791
}else {
792
+ time_t wakeupTime ;
778
793
/* wait votes from all nodes */
779
794
while (!ts -> votingCompleted ) {
780
795
MtmUnlock ();
781
796
WaitLatch (& MyProc -> procLatch ,WL_LATCH_SET ,-1 );
782
797
ResetLatch (& MyProc -> procLatch );
798
+ wakeupTime = MtmGetCurrentTime ()- ts -> wakeupTime ;
799
+ if (wakeupTime > maxWakeupTime ) {
800
+ maxWakeupTime = wakeupTime ;
801
+ }
783
802
MtmLock (LW_SHARED );
784
803
}
785
804
x -> status = ts -> status ;
@@ -972,6 +991,7 @@ void MtmWakeUpBackend(MtmTransState* ts)
972
991
{
973
992
MTM_LOG3 ("Wakeup backed procno=%d, pid=%d" ,ts -> procno ,ProcGlobal -> allProcs [ts -> procno ].pid );
974
993
ts -> votingCompleted = true;
994
+ ts -> wakeupTime = MtmGetCurrentTime ();
975
995
SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
976
996
}
977
997
@@ -1651,6 +1671,19 @@ _PG_init(void)
1651
1671
NULL
1652
1672
);
1653
1673
1674
+ DefineCustomBoolVariable (
1675
+ "multimaster.use_dtm" ,
1676
+ "Use distributed transaction manager" ,
1677
+ NULL ,
1678
+ & MtmUseDtm ,
1679
+ true,
1680
+ PGC_BACKEND ,
1681
+ 0 ,
1682
+ NULL ,
1683
+ NULL ,
1684
+ NULL
1685
+ );
1686
+
1654
1687
DefineCustomIntVariable (
1655
1688
"multimaster.workers" ,
1656
1689
"Number of multimaster executor workers per node" ,
@@ -2069,6 +2102,27 @@ mtm_drop_node(PG_FUNCTION_ARGS)
2069
2102
PG_RETURN_VOID ();
2070
2103
}
2071
2104
2105
+ Datum
2106
+ mtm_poll_node (PG_FUNCTION_ARGS )
2107
+ {
2108
+ int nodeId = PG_GETARG_INT32 (0 );
2109
+ bool nowait = PG_GETARG_BOOL (1 );
2110
+ bool online = true;
2111
+ while (BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 )) {
2112
+ if (nowait ) {
2113
+ online = false;
2114
+ break ;
2115
+ }else {
2116
+ MtmSleep (STATUS_POLL_DELAY );
2117
+ }
2118
+ }
2119
+ if (!nowait ) {
2120
+ /* Just wait some time until logical repication channels will be reestablished */
2121
+ MtmSleep (MtmNodeDisableDelay );
2122
+ }
2123
+ PG_RETURN_BOOL (online );
2124
+ }
2125
+
2072
2126
Datum
2073
2127
mtm_recover_node (PG_FUNCTION_ARGS )
2074
2128
{