@@ -2128,7 +2128,7 @@ void MtmRefreshClusterStatus()
2128
2128
* connectivity graph is stabilized.
2129
2129
*/
2130
2130
oldClique = newClique ;
2131
- MtmSleep (MSEC_TO_USEC (MtmHeartbeatSendTimeout )* 2 );/* double timeout tocondider worst case when heartbeatsend interval is added with refresh cluster status interval */
2131
+ MtmSleep (MSEC_TO_USEC (MtmHeartbeatRecvTimeout )* 2 );/* double timeout toconsider the worst case when heartbeatreceive interval is added with refresh cluster status interval */
2132
2132
MtmBuildConnectivityMatrix (matrix );
2133
2133
newClique = MtmFindMaxClique (matrix ,Mtm -> nAllNodes ,& cliqueSize );
2134
2134
}while (newClique != oldClique );
@@ -2232,7 +2232,7 @@ void MtmOnNodeDisconnect(int nodeId)
2232
2232
BIT_SET (SELF_CONNECTIVITY_MASK ,nodeId - 1 );
2233
2233
BIT_SET (Mtm -> reconnectMask ,nodeId - 1 );
2234
2234
elog (LOG ,"Disconnect node %d connectivity mask %llx" ,
2235
- nodeId ,( long long ) SELF_CONNECTIVITY_MASK );
2235
+ nodeId ,SELF_CONNECTIVITY_MASK );
2236
2236
MtmUnlock ();
2237
2237
}
2238
2238
@@ -2242,7 +2242,7 @@ void MtmOnNodeDisconnect(int nodeId)
2242
2242
void MtmOnNodeConnect (int nodeId )
2243
2243
{
2244
2244
MtmLock (LW_EXCLUSIVE );
2245
- elog (LOG ,"Connect node %d connectivity mask %llx" ,nodeId ,( long long ) SELF_CONNECTIVITY_MASK );
2245
+ elog (LOG ,"Connect node %d connectivity mask %llx" ,nodeId ,SELF_CONNECTIVITY_MASK );
2246
2246
BIT_CLEAR (SELF_CONNECTIVITY_MASK ,nodeId - 1 );
2247
2247
BIT_SET (Mtm -> reconnectMask ,nodeId - 1 );/* force sender to reestablish connection and send heartbeat */
2248
2248
MtmUnlock ();
@@ -2254,6 +2254,7 @@ void MtmOnNodeConnect(int nodeId)
2254
2254
void MtmReconnectNode (int nodeId )
2255
2255
{
2256
2256
MtmLock (LW_EXCLUSIVE );
2257
+ elog (LOG ,"Reconnect node %d connectivity mask %llx" ,nodeId ,SELF_CONNECTIVITY_MASK );
2257
2258
BIT_SET (Mtm -> reconnectMask ,nodeId - 1 );
2258
2259
MtmUnlock ();
2259
2260
}
@@ -3289,7 +3290,9 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
3289
3290
MtmSetCurrentTransactionGID (ts -> gid );
3290
3291
MtmTx .isActive = true;
3291
3292
FinishPreparedTransaction (ts -> gid ,commit );
3292
-
3293
+ if (commit ) {
3294
+ MTM_LOG1 ("Distributed transaction %s is committed" ,ts -> gid );
3295
+ }
3293
3296
if (!insideTransaction ) {
3294
3297
CommitTransactionCommand ();
3295
3298
Assert (!MtmTx .isActive );
@@ -3326,15 +3329,21 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
3326
3329
MtmUnlock ();
3327
3330
return REPLMODE_EXIT ;
3328
3331
}
3329
- /* We are not interested in receiving any deteriorated logical messages from recovered node,do recreate slot */
3332
+ /* We are not interested in receiving any deteriorated logical messages from recovered node,so recreate slot */
3330
3333
if (BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 )) {
3331
3334
mode = REPLMODE_CREATE_NEW ;
3332
3335
}
3333
3336
MTM_LOG2 ("%d: receiver slot mode %s" ,MyProcPid ,MtmNodeStatusMnem [Mtm -> status ]);
3334
3337
if (Mtm -> status == MTM_RECOVERY ) {
3335
3338
mode = REPLMODE_RECOVERED ;
3336
- if ((Mtm -> recoverySlot == 0 && (Mtm -> donorNodeId == MtmNodeId || Mtm -> donorNodeId == nodeId ))
3337
- || Mtm -> recoverySlot == nodeId )
3339
+ /* Choose node for recovery if
3340
+ * 1. It is not chosen yet or the same node was chosen before
3341
+ * 2. It is donor node or there is no donor node
3342
+ * 3. Connections with all other live nodes were established
3343
+ */
3344
+ if ((Mtm -> recoverySlot == 0 || Mtm -> recoverySlot == nodeId )
3345
+ && (Mtm -> donorNodeId == MtmNodeId || Mtm -> donorNodeId == nodeId )
3346
+ && (SELF_CONNECTIVITY_MASK & ~Mtm -> disabledNodeMask )== 0 )
3338
3347
{
3339
3348
/* Choose for recovery first available slot or slot of donor node (if any) */
3340
3349
if (Mtm -> nAllNodes >=3 ) {
@@ -3354,6 +3363,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
3354
3363
return REPLMODE_RECOVERY ;
3355
3364
}
3356
3365
}
3366
+ MTM_LOG1 ("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx" ,
3367
+ nodeId ,Mtm -> recoverySlot ,Mtm -> donorNodeId ,SELF_CONNECTIVITY_MASK ,Mtm -> disabledNodeMask );
3357
3368
MtmUnlock ();
3358
3369
/* delay opening of other slots until recovery is completed */
3359
3370
MtmSleep (STATUS_POLL_DELAY );
@@ -3492,6 +3503,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
3492
3503
}
3493
3504
}
3494
3505
}
3506
+ MTM_LOG1 ("Startup of logical replication to node %d" ,MtmReplicationNodeId );
3495
3507
MtmLock (LW_EXCLUSIVE );
3496
3508
if (BIT_CHECK (Mtm -> stoppedNodeMask ,MtmReplicationNodeId - 1 )) {
3497
3509
elog (WARNING ,"Stopped node %d tries to initiate recovery" ,MtmReplicationNodeId );
@@ -4074,6 +4086,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
4074
4086
values [15 ]= Int32GetDatum (Mtm -> nConfigChanges );
4075
4087
values [16 ]= Int64GetDatum (Mtm -> stalledNodeMask );
4076
4088
values [17 ]= Int64GetDatum (Mtm -> stoppedNodeMask );
4089
+ values [18 ]= TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime /USECS_PER_SEC ));
4077
4090
4078
4091
PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc ,values ,nulls )));
4079
4092
}
@@ -4456,6 +4469,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
4456
4469
elog (ERROR ,"Transaction %s (%llu) is aborted by DTM" ,x -> gid , (long64 )x -> xid );
4457
4470
}else {
4458
4471
FinishPreparedTransaction (x -> gid , true);
4472
+ MTM_LOG1 ("Distributed transaction %s is committed" ,x -> gid );
4459
4473
}
4460
4474
}
4461
4475
}