@@ -255,13 +255,18 @@ void MtmUnlockNode(int nodeId)
255255 */
256256
257257
258- timestamp_t MtmGetCurrentTime (void )
258+ timestamp_t MtmGetSystemTime (void )
259259{
260260struct timeval tv ;
261261gettimeofday (& tv ,NULL );
262262return (timestamp_t )tv .tv_sec * USEC + tv .tv_usec + Mtm -> timeShift ;
263263}
264264
265+ timestamp_t MtmGetCurrentTime (void )
266+ {
267+ return MtmGetSystemTime ()+ Mtm -> timeShift ;
268+ }
269+
265270void MtmSleep (timestamp_t interval )
266271{
267272struct timespec ts ;
@@ -1045,7 +1050,7 @@ void MtmRecoveryCompleted(void)
10451050MtmLock (LW_EXCLUSIVE );
10461051Mtm -> recoverySlot = 0 ;
10471052BIT_CLEAR (Mtm -> disabledNodeMask ,MtmNodeId - 1 );
1048- Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = time ( NULL );
1053+ Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
10491054/* Mode will be changed to online once all locagical reciever are connected */
10501055MtmSwitchClusterMode (MTM_CONNECTED );
10511056MtmUnlock ();
@@ -1134,7 +1139,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11341139/* We are lucky: caugth-up without locking cluster! */
11351140}
11361141BIT_CLEAR (Mtm -> disabledNodeMask ,nodeId - 1 );
1137- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
1142+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
11381143Mtm -> nNodes += 1 ;
11391144caughtUp = true;
11401145}else if (!BIT_CHECK (Mtm -> nodeLockerMask ,nodeId - 1 )
@@ -1279,15 +1284,15 @@ bool MtmRefreshClusterStatus(bool nowait)
12791284if (mask & 1 ) {
12801285Mtm -> nNodes -= 1 ;
12811286BIT_SET (Mtm -> disabledNodeMask ,i );
1282- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1287+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
12831288}
12841289}
12851290mask = clique & Mtm -> disabledNodeMask ;/* new enabled nodes mask */
12861291for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
12871292if (mask & 1 ) {
12881293Mtm -> nNodes += 1 ;
12891294BIT_CLEAR (Mtm -> disabledNodeMask ,i );
1290- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1295+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
12911296}
12921297}
12931298MtmCheckQuorum ();
@@ -1327,7 +1332,7 @@ void MtmOnNodeDisconnect(int nodeId)
13271332{
13281333MtmTransState * ts ;
13291334
1330- if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MtmNodeDisableDelay > time ( NULL )) {
1335+ if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC ( MtmNodeDisableDelay ) > MtmGetSystemTime ( )) {
13311336/* Avoid false detection of node failure and prevent node status blinking */
13321337return ;
13331338}
@@ -1342,7 +1347,7 @@ void MtmOnNodeDisconnect(int nodeId)
13421347if (!MtmRefreshClusterStatus (false)) {
13431348MtmLock (LW_EXCLUSIVE );
13441349if (!BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 )) {
1345- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
1350+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
13461351BIT_SET (Mtm -> disabledNodeMask ,nodeId - 1 );
13471352Mtm -> nNodes -= 1 ;
13481353MtmCheckQuorum ();
@@ -1510,14 +1515,14 @@ static void MtmInitialize()
15101515for (i = 0 ;i < MtmNodes ;i ++ ) {
15111516Mtm -> nodes [i ].oldestSnapshot = 0 ;
15121517Mtm -> nodes [i ].transDelay = 0 ;
1513- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1518+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
15141519Mtm -> nodes [i ].con = MtmConnections [i ];
15151520Mtm -> nodes [i ].flushPos = 0 ;
15161521}
15171522PGSemaphoreCreate (& Mtm -> votingSemaphore );
15181523PGSemaphoreReset (& Mtm -> votingSemaphore );
15191524SpinLockInit (& Mtm -> spinlock );
1520- BgwPoolInit (& Mtm -> pool ,MtmExecutor ,MtmDatabaseName ,MtmQueueSize );
1525+ BgwPoolInit (& Mtm -> pool ,MtmExecutor ,MtmDatabaseName ,MtmQueueSize , MtmWorkers );
15211526RegisterXactCallback (MtmXactCallback ,NULL );
15221527MtmTx .snapshot = INVALID_CSN ;
15231528MtmTx .xid = InvalidTransactionId ;
@@ -1681,10 +1686,10 @@ _PG_init(void)
16811686
16821687DefineCustomIntVariable (
16831688"multimaster.node_disable_delay" ,
1684- "Minamal amount of time (sec ) between node status change" ,
1689+ "Minamal amount of time (msec ) between node status change" ,
16851690"This delay is used to avoid false detection of node failure and to prevent blinking of node status node" ,
16861691& MtmNodeDisableDelay ,
1687- 1 ,
1692+ 1000 ,
168816931 ,
16891694INT_MAX ,
16901695PGC_BACKEND ,
@@ -2032,7 +2037,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20322037{
20332038elog (ERROR ,"NodeID %d is out of range [1,%d]" ,nodeId ,Mtm -> nNodes );
20342039}
2035- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
2040+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
20362041BIT_SET (Mtm -> disabledNodeMask ,nodeId - 1 );
20372042Mtm -> nNodes -= 1 ;
20382043MtmCheckQuorum ();
@@ -2083,15 +2088,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20832088if (MtmIsRecoverySession ) {
20842089MTM_LOG1 ("%d: Node %d start recovery of node %d" ,MyProcPid ,MtmNodeId ,MtmReplicationNodeId );
20852090if (!BIT_CHECK (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 )) {
2086- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time ( NULL );
2091+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
20872092BIT_SET (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 );
20882093Mtm -> nNodes -= 1 ;
20892094MtmCheckQuorum ();
20902095}
20912096}else if (BIT_CHECK (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 )) {
20922097if (recoveryCompleted ) {
20932098MTM_LOG1 ("Node %d consider that recovery of node %d is completed: start normal replication" ,MtmNodeId ,MtmReplicationNodeId );
2094- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time ( NULL );
2099+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
20952100BIT_CLEAR (Mtm -> disabledNodeMask ,MtmReplicationNodeId - 1 );
20962101Mtm -> nNodes += 1 ;
20972102MtmCheckQuorum ();
@@ -2238,7 +2243,7 @@ mtm_poll_node(PG_FUNCTION_ARGS)
22382243}
22392244if (!nowait ) {
22402245/* Just wait some time until logical repication channels will be reestablished */
2241- MtmSleep (MtmNodeDisableDelay );
2246+ MtmSleep (MSEC_TO_USEC ( MtmNodeDisableDelay ) );
22422247}
22432248PG_RETURN_BOOL (online );
22442249}
@@ -2297,7 +2302,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22972302usrfctx -> values [4 ]= Int64GetDatum (lag );
22982303usrfctx -> nulls [4 ]= lag < 0 ;
22992304usrfctx -> values [5 ]= Int64GetDatum (Mtm -> transCount ?Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount :0 );
2300- usrfctx -> values [6 ]= TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2305+ usrfctx -> values [6 ]= TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime / USEC ));
23012306usrfctx -> values [7 ]= CStringGetTextDatum (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
23022307usrfctx -> nodeId += 1 ;
23032308
@@ -3058,6 +3063,18 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
30583063MtmGetGtid (pgxact -> xid ,& gtid );
30593064hasDeadlock = MtmGraphFindLoop (& graph ,& gtid );
30603065elog (WARNING ,"Distributed deadlock check for %u:%u = %d" ,gtid .node ,gtid .xid ,hasDeadlock );
3066+ if (!hasDeadlock ) {
3067+ /* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
3068+ * can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
3069+ * refelected in lock graph
3070+ */
3071+ timestamp_t lastPeekTime = BgwGetLastPeekTime (& Mtm -> pool );
3072+ if (lastPeekTime != 0 && MtmGetSystemTime ()- lastPeekTime >=MSEC_TO_USEC (DeadlockTimeout )) {
3073+ hasDeadlock = true;
3074+ elog (WARNING ,"Apply workers were blocked more than %d msec" ,
3075+ (int )USEC_TO_MSEC (MtmGetSystemTime ()- lastPeekTime ));
3076+ }
3077+ }
30613078}
30623079return hasDeadlock ;
30633080}