@@ -245,7 +245,6 @@ static int MtmMaxRecoveryLag;
245
245
static int MtmGcPeriod ;
246
246
static bool MtmIgnoreTablesWithoutPk ;
247
247
static int MtmLockCount ;
248
- static int MtmSenderStarted ;
249
248
250
249
static ExecutorStart_hook_type PreviousExecutorStartHook ;
251
250
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -1668,8 +1667,8 @@ void MtmRecoveryCompleted(void)
1668
1667
Mtm -> nodes [i ].lastHeartbeat = 0 ;/* defuse watchdog until first heartbeat is received */
1669
1668
}
1670
1669
/* Mode will be changed to online once all logical receiver are connected */
1671
- elog (LOG ,"Recovery completed with %d active receiversand %d started senders from %d" ,Mtm -> nReceivers , Mtm -> nSenders ,Mtm -> nLiveNodes - 1 );
1672
- MtmSwitchClusterMode (Mtm -> nReceivers == Mtm -> nLiveNodes - 1 && Mtm -> nSenders == Mtm -> nLiveNodes - 1 ?MTM_ONLINE :MTM_CONNECTED );
1670
+ elog (LOG ,"Recovery completed with %d active receivers from %d" ,Mtm -> nReceivers ,Mtm -> nLiveNodes - 1 );
1671
+ MtmSwitchClusterMode (Mtm -> nReceivers == Mtm -> nLiveNodes - 1 ?MTM_ONLINE :MTM_CONNECTED );
1673
1672
MtmUnlock ();
1674
1673
}
1675
1674
@@ -2199,7 +2198,6 @@ static void MtmInitialize()
2199
2198
Mtm -> transListHead = NULL ;
2200
2199
Mtm -> transListTail = & Mtm -> transListHead ;
2201
2200
Mtm -> nReceivers = 0 ;
2202
- Mtm -> nSenders = 0 ;
2203
2201
Mtm -> timeShift = 0 ;
2204
2202
Mtm -> transCount = 0 ;
2205
2203
Mtm -> gcCount = 0 ;
@@ -2908,9 +2906,11 @@ void MtmReceiverStarted(int nodeId)
2908
2906
MtmEnableNode (nodeId );
2909
2907
MtmCheckQuorum ();
2910
2908
}
2911
- elog (LOG ,"Start %d receivers and %d senders from %d cluster status %s" ,Mtm -> nReceivers + 1 ,Mtm -> nSenders ,Mtm -> nLiveNodes - 1 ,MtmNodeStatusMnem [Mtm -> status ]);
2912
- if (++ Mtm -> nReceivers == Mtm -> nLiveNodes - 1 && Mtm -> nSenders == Mtm -> nLiveNodes - 1 && Mtm -> status == MTM_CONNECTED ) {
2913
- MtmSwitchClusterMode (MTM_ONLINE );
2909
+ elog (LOG ,"Start %d receivers from %d cluster status %s" ,Mtm -> nReceivers + 1 ,Mtm -> nLiveNodes - 1 ,MtmNodeStatusMnem [Mtm -> status ]);
2910
+ if (++ Mtm -> nReceivers == Mtm -> nLiveNodes - 1 ) {
2911
+ if (Mtm -> status == MTM_CONNECTED ) {
2912
+ MtmSwitchClusterMode (MTM_ONLINE );
2913
+ }
2914
2914
}
2915
2915
}
2916
2916
MtmUnlock ();
@@ -2997,7 +2997,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2997
2997
elog (WARNING ,"Process %d starts recovery from node %d" ,MyProcPid ,nodeId );
2998
2998
Mtm -> recoverySlot = nodeId ;
2999
2999
Mtm -> nReceivers = 0 ;
3000
- Mtm -> nSenders = 0 ;
3001
3000
Mtm -> recoveryCount += 1 ;
3002
3001
Mtm -> pglogicalNodeMask = 0 ;
3003
3002
MtmUnlock ();
@@ -3077,19 +3076,6 @@ MtmOnProcExit(int code, Datum arg)
3077
3076
}
3078
3077
}
3079
3078
3080
- static void
3081
- MtmReplicationStartedHook (struct PGLogicalStartedHookArgs * args )
3082
- {
3083
- MtmLock (LW_EXCLUSIVE );
3084
- MtmSenderStarted = 1 ;
3085
- elog (LOG ,"Start %d senders and %d receivers from %d cluster status %s" ,Mtm -> nSenders + 1 ,Mtm -> nReceivers ,Mtm -> nLiveNodes - 1 ,MtmNodeStatusMnem [Mtm -> status ]);
3086
- if (++ Mtm -> nSenders == Mtm -> nLiveNodes - 1 && Mtm -> nReceivers == Mtm -> nLiveNodes - 1 && Mtm -> status == MTM_CONNECTED ) {
3087
- MtmSwitchClusterMode (MTM_ONLINE );
3088
- }
3089
- MtmUnlock ();
3090
- }
3091
-
3092
-
3093
3079
static void
3094
3080
MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
3095
3081
{
@@ -3206,9 +3192,6 @@ static void
3206
3192
MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
3207
3193
{
3208
3194
if (MtmReplicationNodeId >=0 ) {
3209
- MtmLock (LW_EXCLUSIVE );
3210
- Mtm -> nSenders -= MtmSenderStarted ;
3211
- MtmUnlock ();
3212
3195
MTM_LOG1 ("Logical replication to node %d is stopped" ,MtmReplicationNodeId );
3213
3196
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
3214
3197
MtmReplicationNodeId = -1 ;/* defuse on_proc_exit hook */
@@ -3320,7 +3303,6 @@ bool MtmFilterTransaction(char* record, int size)
3320
3303
void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
3321
3304
{
3322
3305
hooks -> startup_hook = MtmReplicationStartupHook ;
3323
- hooks -> started_hook = MtmReplicationStartedHook ;
3324
3306
hooks -> shutdown_hook = MtmReplicationShutdownHook ;
3325
3307
hooks -> txn_filter_hook = MtmReplicationTxnFilterHook ;
3326
3308
hooks -> row_filter_hook = MtmReplicationRowFilterHook ;