@@ -2128,7 +2128,7 @@ void MtmRefreshClusterStatus()
21282128 * connectivity graph is stabilized.
21292129 */
21302130oldClique = newClique ;
2131- MtmSleep (MSEC_TO_USEC (MtmHeartbeatSendTimeout )* 2 );/* double timeout tocondider worst case when heartbeatsend interval is added with refresh cluster status interval */
2131+ MtmSleep (MSEC_TO_USEC (MtmHeartbeatRecvTimeout )* 2 );/* double timeout toconsider the worst case when heartbeatreceive interval is added with refresh cluster status interval */
21322132MtmBuildConnectivityMatrix (matrix );
21332133newClique = MtmFindMaxClique (matrix ,Mtm -> nAllNodes ,& cliqueSize );
21342134}while (newClique != oldClique );
@@ -2232,7 +2232,7 @@ void MtmOnNodeDisconnect(int nodeId)
22322232BIT_SET (SELF_CONNECTIVITY_MASK ,nodeId - 1 );
22332233BIT_SET (Mtm -> reconnectMask ,nodeId - 1 );
22342234elog (LOG ,"Disconnect node %d connectivity mask %llx" ,
2235- nodeId ,( long long ) SELF_CONNECTIVITY_MASK );
2235+ nodeId ,SELF_CONNECTIVITY_MASK );
22362236MtmUnlock ();
22372237}
22382238
@@ -2242,7 +2242,7 @@ void MtmOnNodeDisconnect(int nodeId)
22422242void MtmOnNodeConnect (int nodeId )
22432243{
22442244MtmLock (LW_EXCLUSIVE );
2245- elog (LOG ,"Connect node %d connectivity mask %llx" ,nodeId ,( long long ) SELF_CONNECTIVITY_MASK );
2245+ elog (LOG ,"Connect node %d connectivity mask %llx" ,nodeId ,SELF_CONNECTIVITY_MASK );
22462246BIT_CLEAR (SELF_CONNECTIVITY_MASK ,nodeId - 1 );
22472247BIT_SET (Mtm -> reconnectMask ,nodeId - 1 );/* force sender to reestablish connection and send heartbeat */
22482248MtmUnlock ();
@@ -2254,6 +2254,7 @@ void MtmOnNodeConnect(int nodeId)
22542254void MtmReconnectNode (int nodeId )
22552255{
22562256MtmLock (LW_EXCLUSIVE );
2257+ elog (LOG ,"Reconnect node %d connectivity mask %llx" ,nodeId ,SELF_CONNECTIVITY_MASK );
22572258BIT_SET (Mtm -> reconnectMask ,nodeId - 1 );
22582259MtmUnlock ();
22592260}
@@ -3289,7 +3290,9 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
32893290MtmSetCurrentTransactionGID (ts -> gid );
32903291MtmTx .isActive = true;
32913292FinishPreparedTransaction (ts -> gid ,commit );
3292-
3293+ if (commit ) {
3294+ MTM_LOG1 ("Distributed transaction %s is committed" ,ts -> gid );
3295+ }
32933296if (!insideTransaction ) {
32943297CommitTransactionCommand ();
32953298Assert (!MtmTx .isActive );
@@ -3326,15 +3329,21 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33263329MtmUnlock ();
33273330return REPLMODE_EXIT ;
33283331}
3329- /* We are not interested in receiving any deteriorated logical messages from recovered node,do recreate slot */
3332+ /* We are not interested in receiving any deteriorated logical messages from recovered node,so recreate slot */
33303333if (BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 )) {
33313334mode = REPLMODE_CREATE_NEW ;
33323335}
33333336MTM_LOG2 ("%d: receiver slot mode %s" ,MyProcPid ,MtmNodeStatusMnem [Mtm -> status ]);
33343337if (Mtm -> status == MTM_RECOVERY ) {
33353338mode = REPLMODE_RECOVERED ;
3336- if ((Mtm -> recoverySlot == 0 && (Mtm -> donorNodeId == MtmNodeId || Mtm -> donorNodeId == nodeId ))
3337- || Mtm -> recoverySlot == nodeId )
3339+ /* Choose node for recovery if
3340+ * 1. It is not chosen yet or the same node was chosen before
3341+ * 2. It is donor node or there is no donor node
3342+ * 3. Connections with all other live nodes were established
3343+ */
3344+ if ((Mtm -> recoverySlot == 0 || Mtm -> recoverySlot == nodeId )
3345+ && (Mtm -> donorNodeId == MtmNodeId || Mtm -> donorNodeId == nodeId )
3346+ && (SELF_CONNECTIVITY_MASK & ~Mtm -> disabledNodeMask )== 0 )
33383347{
33393348/* Choose for recovery first available slot or slot of donor node (if any) */
33403349if (Mtm -> nAllNodes >=3 ) {
@@ -3354,6 +3363,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33543363return REPLMODE_RECOVERY ;
33553364}
33563365}
3366+ MTM_LOG1 ("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx" ,
3367+ nodeId ,Mtm -> recoverySlot ,Mtm -> donorNodeId ,SELF_CONNECTIVITY_MASK ,Mtm -> disabledNodeMask );
33573368MtmUnlock ();
33583369/* delay opening of other slots until recovery is completed */
33593370MtmSleep (STATUS_POLL_DELAY );
@@ -3492,6 +3503,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
34923503}
34933504}
34943505}
3506+ MTM_LOG1 ("Startup of logical replication to node %d" ,MtmReplicationNodeId );
34953507MtmLock (LW_EXCLUSIVE );
34963508if (BIT_CHECK (Mtm -> stoppedNodeMask ,MtmReplicationNodeId - 1 )) {
34973509elog (WARNING ,"Stopped node %d tries to initiate recovery" ,MtmReplicationNodeId );
@@ -4074,6 +4086,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
40744086values [15 ]= Int32GetDatum (Mtm -> nConfigChanges );
40754087values [16 ]= Int64GetDatum (Mtm -> stalledNodeMask );
40764088values [17 ]= Int64GetDatum (Mtm -> stoppedNodeMask );
4089+ values [18 ]= TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime /USECS_PER_SEC ));
40774090
40784091PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc ,values ,nulls )));
40794092}
@@ -4456,6 +4469,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
44564469elog (ERROR ,"Transaction %s (%llu) is aborted by DTM" ,x -> gid , (long64 )x -> xid );
44574470}else {
44584471FinishPreparedTransaction (x -> gid , true);
4472+ MTM_LOG1 ("Distributed transaction %s is committed" ,x -> gid );
44594473}
44604474}
44614475}