@@ -635,9 +635,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
635635x -> isDistributed = MtmIsUserTransaction ();
636636x -> isPrepared = false;
637637x -> isTransactionBlock = IsTransactionBlock ();
638- /* Application name can becahnged usnig PGAPPNAME environment variable */
638+ /* Application name can bechanged usnig PGAPPNAME environment variable */
639639if (!IsBackgroundWorker && x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name ,MULTIMASTER_ADMIN )!= 0 ) {
640- /* reject all user's transactions at offline cluster */
640+ /* Reject all user's transactions at offline cluster.
641+ * Allow execution of transaction by bg-workers to make it possible to perform recovery.
642+ */
641643MtmUnlock ();
642644elog (ERROR ,"Multimaster node is not online: current status %s" ,MtmNodeStatusMnem [Mtm -> status ]);
643645}
@@ -673,14 +675,17 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
673675if (Mtm -> disabledNodeMask != 0 ) {
674676MtmRefreshClusterStatus (true);
675677if (!IsBackgroundWorker && Mtm -> status != MTM_ONLINE ) {
676- elog (ERROR ,"Abort current transaction because this cluster node is not online" );
678+ /* Do not take in accoutn bg-workers which are performing recovery */
679+ elog (ERROR ,"Abort current transaction because this cluster node is in %s status" ,MtmNodeStatusMnem [Mtm -> status ]);
677680}
678681}
679682
680683MtmLock (LW_EXCLUSIVE );
681684
682685/*
683- * Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
686+ * Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
687+ * Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
688+ * and should not cause cluster status change.
684689 */
685690if (!x -> isReplicated ) {
686691MtmCheckClusterLock ();
@@ -716,7 +721,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716721}
717722MtmTransactionListAppend (ts );
718723MtmAddSubtransactions (ts ,subxids ,ts -> nSubxids );
719- MTM_TRACE ("%d: MtmPrePrepareTransaction prepare commit of %d CSN=%ld\n" ,MyProcPid ,x -> xid ,ts -> csn );
724+ MTM_TRACE ("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n" ,
725+ MyProcPid ,x -> xid ,ts -> gtid .xid ,ts -> gtid .node ,ts -> csn );
720726MtmUnlock ();
721727
722728}
@@ -842,14 +848,6 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
842848}
843849}
844850
845- void MtmRecoveryCompleted (void )
846- {
847- elog (WARNING ,"Recovery of node %d is completed" ,MtmNodeId );
848- Mtm -> recoverySlot = 0 ;
849- BIT_CLEAR (Mtm -> disabledNodeMask ,MtmNodeId - 1 );
850- MtmSwitchClusterMode (MTM_ONLINE );
851- }
852-
853851void MtmJoinTransaction (GlobalTransactionId * gtid ,csn_t globalSnapshot )
854852{
855853MtmLock (LW_EXCLUSIVE );
@@ -933,6 +931,18 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
933931 * -------------------------------------------
934932 */
935933
934+ void MtmRecoveryCompleted (void )
935+ {
936+ elog (WARNING ,"Recovery of node %d is completed" ,MtmNodeId );
937+ MtmLock (LW_EXCLUSIVE );
938+ Mtm -> recoverySlot = 0 ;
939+ BIT_CLEAR (Mtm -> disabledNodeMask ,MtmNodeId - 1 );
940+ /* Mode will be changed to online once all locagical reciever are connected */
941+ MtmSwitchClusterMode (MTM_CONNECTED );
942+ MtmUnlock ();
943+ }
944+
945+
936946
937947/**
938948 * Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
@@ -993,10 +1003,10 @@ bool MtmIsRecoveredNode(int nodeId)
9931003bool MtmRecoveryCaughtUp (int nodeId ,XLogRecPtr slotLSN )
9941004{
9951005bool caughtUp = false;
1006+ MtmLock (LW_EXCLUSIVE );
9961007if (MtmIsRecoveredNode (nodeId )) {
9971008XLogRecPtr walLSN = GetXLogInsertRecPtr ();
998- MtmLock (LW_EXCLUSIVE );
999- if (slotLSN == walLSN ) {
1009+ if (slotLSN == walLSN && Mtm -> nActiveTransactions == 0 ) {
10001010if (BIT_CHECK (Mtm -> nodeLockerMask ,nodeId - 1 )) {
10011011elog (WARNING ,"Node %d is caught-up" ,nodeId );
10021012BIT_CLEAR (Mtm -> walSenderLockerMask ,MyWalSnd - WalSndCtl -> walsnds );
@@ -1018,18 +1028,17 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10181028 * We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
10191029 * Is there some better way to establish mapping between nodes ad WAL-seconder?
10201030 */
1021- elog (WARNING ,"Node %d is almost caught-up: lock cluster" ,nodeId );
1031+ elog (WARNING ,"Node %d is almost caught-up: slot position %lx, WAL position %lx, active transactions %d" ,
1032+ nodeId ,slotLSN ,walLSN ,Mtm -> nActiveTransactions );
10221033Assert (MyWalSnd != NULL );/* This function is called by WAL-sender, so it should not be NULL */
10231034BIT_SET (Mtm -> nodeLockerMask ,nodeId - 1 );
10241035BIT_SET (Mtm -> walSenderLockerMask ,MyWalSnd - WalSndCtl -> walsnds );
10251036Mtm -> nLockers += 1 ;
10261037}else {
10271038MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d\n" ,nodeId ,slotLSN ,walLSN ,MyWalSnd -> sentPtr ,Mtm -> nLockers ,Mtm -> nActiveTransactions );
10281039}
1029- MtmUnlock ();
1030- }else {
1031- MTM_INFO ("Node %d is not in recovery mode\n" ,nodeId );
10321040}
1041+ MtmUnlock ();
10331042return caughtUp ;
10341043}
10351044
@@ -1044,7 +1053,7 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10441053/*
10451054 * If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
10461055 * WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
1047- * This function is calledat transactionstart with multimaster lock set
1056+ * This function is calledbefore transactionprepare with multimaster lock set.
10481057 */
10491058static void
10501059MtmCheckClusterLock ()
@@ -1071,8 +1080,8 @@ MtmCheckClusterLock()
10711080}
10721081}
10731082if (mask != 0 ) {
1074- /* some "almost catch-up" wal-senders are still working */
1075- /* Do not start new transactions until themcomplete */
1083+ /* some "almost catch-up" wal-senders are still working. */
1084+ /* Do not start new transactions until themare completed. */
10761085MtmUnlock ();
10771086MtmSleep (delay );
10781087if (delay * 2 <=MAX_WAIT_TIMEOUT ) {
@@ -1215,6 +1224,7 @@ void MtmOnNodeDisconnect(int nodeId)
12151224void MtmOnNodeConnect (int nodeId )
12161225{
12171226BIT_CLEAR (Mtm -> connectivityMask ,nodeId - 1 );
1227+ elog (NOTICE ,"Reconnect node %d" ,nodeId );
12181228RaftableSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
12191229}
12201230
@@ -1645,19 +1655,23 @@ _PG_fini(void)
16451655}
16461656
16471657
1648-
1658+ /*
1659+ * This functions is called by pglogical receiver main function when receiver background worker is started.
1660+ * We switch to ONLINE mode when all receviers are connected.
1661+ * As far as background worker can be restarted multiple times, use node bitmask.
1662+ */
16491663void MtmReceiverStarted (int nodeId )
16501664{
1651- SpinLockAcquire ( & Mtm -> spinlock );
1665+ MtmLock ( LW_EXCLUSIVE );
16521666if (!BIT_CHECK (Mtm -> pglogicalNodeMask ,nodeId - 1 )) {
16531667BIT_SET (Mtm -> pglogicalNodeMask ,nodeId - 1 );
16541668if (++ Mtm -> nReceivers == Mtm -> nNodes - 1 ) {
16551669if (Mtm -> status == MTM_CONNECTED ) {
16561670MtmSwitchClusterMode (MTM_ONLINE );
16571671}
16581672}
1659- }
1660- SpinLockRelease ( & Mtm -> spinlock );
1673+ }
1674+ MtmUnlock ();
16611675}
16621676
16631677/*