@@ -170,7 +170,7 @@ static void MtmShmemStartup(void);
170170
171171static BgwPool * MtmPoolConstructor (void );
172172static bool MtmRunUtilityStmt (PGconn * conn ,char const * sql ,char * * errmsg );
173- static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError );
173+ static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError , int forceOnNode );
174174static void MtmProcessDDLCommand (char const * queryString ,bool transactional );
175175
176176static void MtmLockCluster (void );
@@ -978,7 +978,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
978978x -> isTwoPhase = false;
979979x -> isTransactionBlock = IsTransactionBlock ();
980980/* Application name can be changed using PGAPPNAME environment variable */
981- if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name ,MULTIMASTER_ADMIN )!= 0 && !MtmBypass ) {
981+ if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name ,MULTIMASTER_ADMIN )!= 0
982+ && strcmp (application_name ,MULTIMASTER_BROADCAST_SERVICE )!= 0
983+ && !MtmBypass ) {
982984/* Reject all user's transactions at offline cluster.
983985 * Allow execution of transaction by bg-workers to make it possible to perform recovery.
984986 */
@@ -2410,7 +2412,7 @@ static void MtmInitialize()
24102412for (i = 0 ;i < MtmNodes ;i ++ ) {
24112413Mtm -> nodes [i ].oldestSnapshot = 0 ;
24122414Mtm -> nodes [i ].disabledNodeMask = 0 ;
2413- Mtm -> nodes [i ].connectivityMask = 7 ; // XXXX
2415+ Mtm -> nodes [i ].connectivityMask = ((( nodemask_t ) 1 << MtmNodes ) - 1 );
24142416Mtm -> nodes [i ].lockGraphUsed = 0 ;
24152417Mtm -> nodes [i ].lockGraphAllocated = 0 ;
24162418Mtm -> nodes [i ].lockGraphData = NULL ;
@@ -2423,6 +2425,7 @@ static void MtmInitialize()
24232425Mtm -> nodes [i ].originId = InvalidRepOriginId ;
24242426Mtm -> nodes [i ].timeline = 0 ;
24252427Mtm -> nodes [i ].nHeartbeats = 0 ;
2428+ Mtm -> nodes [i ].manualRecovery = false;
24262429Mtm -> nodes [i ].slotDeleted = false;
24272430}
24282431Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
@@ -3345,9 +3348,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33453348}
33463349
33473350/* Await until node is connected and both receiver and sender are in clique */
3348- while (BIT_CHECK (SELF_CONNECTIVITY_MASK ,nodeId - 1 )||
3349- !BIT_CHECK (Mtm -> clique ,nodeId - 1 )||
3350- !BIT_CHECK (Mtm -> clique ,MtmNodeId - 1 ) )
3351+ while (BIT_CHECK (EFFECTIVE_CONNECTIVITY_MASK ,nodeId - 1 )||
3352+ BIT_CHECK (EFFECTIVE_CONNECTIVITY_MASK ,MtmNodeId - 1 ))
33513353{
33523354MtmUnlock ();
33533355if (* shutdown )
@@ -3402,6 +3404,7 @@ void MtmRecoverNode(int nodeId)
34023404MTM_ELOG (ERROR ,"NodeID %d is out of range [1,%d]" ,nodeId ,Mtm -> nAllNodes );
34033405}
34043406MtmLock (LW_EXCLUSIVE );
3407+ Mtm -> nodes [nodeId - 1 ].manualRecovery = true;
34053408if (BIT_CHECK (Mtm -> stoppedNodeMask ,nodeId - 1 ))
34063409{
34073410Assert (BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 ));
@@ -3412,8 +3415,8 @@ void MtmRecoverNode(int nodeId)
34123415
34133416if (!MtmIsBroadcast ())
34143417{
3415- MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" ,nodeId ), true);
3416- MtmBroadcastUtilityStmt (psprintf ("select mtm.recover_node(%d)" ,nodeId ), true);
3418+ MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" ,nodeId ), true, 0 );
3419+ MtmBroadcastUtilityStmt (psprintf ("select mtm.recover_node(%d)" ,nodeId ), true, 0 );
34173420}
34183421}
34193422
@@ -3443,7 +3446,7 @@ void MtmResumeNode(int nodeId)
34433446
34443447if (!MtmIsBroadcast ())
34453448{
3446- MtmBroadcastUtilityStmt (psprintf ("select mtm.resume_node(%d)" ,nodeId ), true);
3449+ MtmBroadcastUtilityStmt (psprintf ("select mtm.resume_node(%d)" ,nodeId ), true, nodeId );
34473450}
34483451}
34493452
@@ -3458,20 +3461,19 @@ void MtmStopNode(int nodeId, bool dropSlot)
34583461MTM_ELOG (ERROR ,"NodeID %d is out of range [1,%d]" ,nodeId ,Mtm -> nAllNodes );
34593462}
34603463
3461- MtmLock (LW_EXCLUSIVE );
3464+ if (!MtmIsBroadcast ())
3465+ {
3466+ MtmBroadcastUtilityStmt (psprintf ("select mtm.stop_node(%d,%s)" ,nodeId ,dropSlot ?"true" :"false" ), true,nodeId );
3467+ }
34623468
3469+ MtmLock (LW_EXCLUSIVE );
34633470BIT_SET (Mtm -> stoppedNodeMask ,nodeId - 1 );
3464-
34653471if (!BIT_CHECK (Mtm -> disabledNodeMask ,nodeId - 1 ))
34663472{
34673473MtmDisableNode (nodeId );
34683474}
34693475MtmUnlock ();
34703476
3471- if (!MtmIsBroadcast ())
3472- {
3473- MtmBroadcastUtilityStmt (psprintf ("select mtm.stop_node(%d,%s)" ,nodeId ,dropSlot ?"true" :"false" ), true);
3474- }
34753477if (dropSlot )
34763478{
34773479MtmDropSlot (nodeId );
@@ -3545,12 +3547,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35453547}
35463548
35473549if (BIT_CHECK (Mtm -> stoppedNodeMask ,MtmReplicationNodeId - 1 )) {
3548- MTM_ELOG (WARNING ,"Stopped node %d tries to initiate recovery" ,MtmReplicationNodeId );
3549- do {
3550- MtmUnlock ();
3551- MtmSleep (STATUS_POLL_DELAY );
3552- MtmLock (LW_EXCLUSIVE );
3553- }while (BIT_CHECK (Mtm -> stoppedNodeMask ,MtmReplicationNodeId - 1 ));
3550+ MtmUnlock ();
3551+ MTM_ELOG (ERROR ,"Stopped node %d tries to connect" ,MtmReplicationNodeId );
35543552}
35553553
35563554if (MtmIsRecoverySession ) {
@@ -3857,8 +3855,8 @@ mtm_add_node(PG_FUNCTION_ARGS)
38573855}
38583856if (!MtmIsBroadcast ())
38593857{
3860- MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" ,Mtm -> nAllNodes + 1 ), true);
3861- MtmBroadcastUtilityStmt (psprintf ("select mtm.add_node('%s')" ,connStr ), true);
3858+ MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" ,Mtm -> nAllNodes + 1 ), true, 0 );
3859+ MtmBroadcastUtilityStmt (psprintf ("select mtm.add_node('%s')" ,connStr ), true, 0 );
38623860}
38633861else
38643862{
@@ -4403,7 +4401,7 @@ MtmNoticeReceiver(void *i, const PGresult *res)
44034401pfree (stripped_notice );
44044402}
44054403
4406- static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError )
4404+ static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError , int forceOnNode )
44074405{
44084406int i = 0 ;
44094407nodemask_t disabledNodeMask = Mtm -> disabledNodeMask ;
@@ -4415,7 +4413,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
44154413
44164414for (i = 0 ;i < nNodes ;i ++ )
44174415{
4418- if (!BIT_CHECK (disabledNodeMask ,i ))
4416+ if (!BIT_CHECK (disabledNodeMask ,i )|| ( i + 1 == forceOnNode ) )
44194417{
44204418conns [i ]= PQconnectdb_safe (psprintf ("%s application_name=%s" ,Mtm -> nodes [i ].con .connStr ,MULTIMASTER_BROADCAST_SERVICE ));
44214419if (PQstatus (conns [i ])!= CONNECTION_OK )