@@ -155,7 +155,8 @@ char const* const MtmNodeStatusMnem[] =
155155"Offline" ,
156156"Connected" ,
157157"Online" ,
158- "Recovery"
158+ "Recovery" ,
159+ "InMinor"
159160};
160161
161162bool MtmDoReplication ;
@@ -631,10 +632,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
631632x -> isReplicated = false;
632633x -> isDistributed = MtmIsUserTransaction ();
633634x -> isPrepared = false;
634- if (x -> isDistributed && Mtm -> status != MTM_ONLINE ) {
635+ x -> isTransactionBlock = IsTransactionBlock ();
636+ /* Application name can be cahnged usnig PGAPPNAME environment variable */
637+ if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name ,MULTIMASTER_ADMIN )!= 0 ) {
635638/* reject all user's transactions at offline cluster */
636639MtmUnlock ();
637- Assert (Mtm -> status == MTM_ONLINE );
638640elog (ERROR ,"Multimaster node is not online: current status %s" ,MtmNodeStatusMnem [Mtm -> status ]);
639641}
640642x -> containsDML = false;
@@ -981,11 +983,14 @@ bool MtmIsRecoveredNode(int nodeId)
981983 * We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
982984 * Is there some better way to establish mapping between nodes ad WAL-seconder?
983985 */
986+ elog (WARNING ,"Node %d is catching up" ,nodeId );
984987MtmLock (LW_EXCLUSIVE );
985988BIT_SET (Mtm -> nodeLockerMask ,nodeId - 1 );
986989BIT_SET (Mtm -> walSenderLockerMask ,MyWalSnd - WalSndCtl -> walsnds );
987990Mtm -> nLockers += 1 ;
988991MtmUnlock ();
992+ }else {
993+ MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n" ,nodeId ,MyWalSnd -> sentPtr ,GetXLogInsertRecPtr (),Mtm -> nLockers );
989994}
990995return true;
991996}
@@ -1022,7 +1027,7 @@ MtmCheckClusterLock()
10221027break ;
10231028}else {
10241029/* recovered replica catched up with master */
1025- elog (WARNING ,"WAL-sender %d completereceovery " ,i );
1030+ elog (WARNING ,"WAL-sender %d completerecovery " ,i );
10261031BIT_CLEAR (Mtm -> walSenderLockerMask ,i );
10271032}
10281033}
@@ -1608,8 +1613,9 @@ void MtmReceiverStarted(int nodeId)
16081613if (!BIT_CHECK (Mtm -> pglogicalNodeMask ,nodeId - 1 )) {
16091614BIT_SET (Mtm -> pglogicalNodeMask ,nodeId - 1 );
16101615if (++ Mtm -> nReceivers == Mtm -> nNodes - 1 ) {
1611- Assert (Mtm -> status == MTM_CONNECTED );
1612- MtmSwitchClusterMode (MTM_ONLINE );
1616+ if (Mtm -> status == MTM_CONNECTED ) {
1617+ MtmSwitchClusterMode (MTM_ONLINE );
1618+ }
16131619}
16141620 }
16151621SpinLockRelease (& Mtm -> spinlock );
@@ -1622,19 +1628,28 @@ void MtmReceiverStarted(int nodeId)
16221628 */
16231629MtmSlotMode MtmReceiverSlotMode (int nodeId )
16241630{
1631+ bool recovery = false;
16251632while (Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE ) {
1633+ MTM_INFO ("%d: receiver slot mode %s\n" ,MyProcPid ,MtmNodeStatusMnem [Mtm -> status ]);
16261634if (Mtm -> status == MTM_RECOVERY ) {
1635+ recovery = true;
16271636if (Mtm -> recoverySlot == 0 || Mtm -> recoverySlot == nodeId ) {
16281637/* Choose for recovery first available slot */
1638+ elog (WARNING ,"Start recovery from node %d" ,nodeId );
16291639Mtm -> recoverySlot = nodeId ;
16301640return SLOT_OPEN_EXISTED ;
16311641}
16321642}
16331643/* delay opening of other slots until recovery is completed */
16341644MtmSleep (STATUS_POLL_DELAY );
16351645}
1646+ if (recovery ) {
1647+ elog (WARNING ,"Recreate replication slot for node %d after end of recovery" ,nodeId );
1648+ }else {
1649+ MTM_INFO ("%d: Reuse replication slot for node %d\n" ,MyProcPid ,nodeId );
1650+ }
16361651/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1637- return Mtm -> recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS ;
1652+ return recovery ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS ;
16381653}
16391654
16401655static bool MtmIsBroadcast ()
@@ -1690,7 +1705,11 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16901705static bool
16911706MtmReplicationTxnFilterHook (struct PGLogicalTxnFilterArgs * args )
16921707{
1693- return args -> origin_id == InvalidRepOriginId || MtmIsRecoveredNode (MtmReplicationNodeId );
1708+ bool res = Mtm -> status != MTM_RECOVERY
1709+ && (args -> origin_id == InvalidRepOriginId
1710+ || MtmIsRecoveredNode (MtmReplicationNodeId ));
1711+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" ,MyProcPid ,res );
1712+ return res ;
16941713}
16951714
16961715void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )