2323#define MTM_ERRMSG (fmt ,...) errmsg(MTM_TAG fmt, ## __VA_ARGS__)
2424
2525#if DEBUG_LEVEL == 0
26- #define MTM_LOG1 (fmt , ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27- #define MTM_LOG2 (fmt , ...)
28- #define MTM_LOG3(fmt, ...)
29- #define MTM_LOG4 (fmt , ...)
26+ #define MTM_LOG1 (fmt , ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27+ #define MTM_LOG2 (fmt , ...)
28+ #define MTM_LOG3 (fmt , ...)
29+ #define MTM_LOG4 (fmt , ...)
3030#elif DEBUG_LEVEL == 1
31- #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32- #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33- #define MTM_LOG3 (fmt , ...)
34- #define MTM_LOG4(fmt, ...)
31+ #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32+ #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33+ #define MTM_LOG3 (fmt , ...)
34+ #define MTM_LOG4 (fmt , ...)
3535#elif DEBUG_LEVEL == 2
36- #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37- #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38- #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39- #define MTM_LOG4 (fmt , ...)
36+ #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37+ #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38+ #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39+ #define MTM_LOG4 (fmt , ...)
4040#elif DEBUG_LEVEL >=3
41- #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42- #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43- #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44- #define MTM_LOG4 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
41+ #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42+ #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43+ #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44+ #define MTM_LOG4 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4545#endif
4646
4747#if MTM_TRACE == 0
@@ -98,7 +98,7 @@ typedef char pgid_t[MULTIMASTER_MAX_GID_SIZE];
9898#define SELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
9999
100100typedef enum
101- {
101+ {
102102PGLOGICAL_COMMIT ,
103103PGLOGICAL_PREPARE ,
104104PGLOGICAL_COMMIT_PREPARED ,
@@ -107,7 +107,7 @@ typedef enum
107107}PGLOGICAL_EVENT ;
108108
109109/* Identifier of global transaction */
110- typedef struct
110+ typedef struct
111111{
112112int node ;/* Zero based index of node initiating transaction */
113113TransactionId xid ;/* Transaction ID at this node */
@@ -116,7 +116,7 @@ typedef struct
116116#define EQUAL_GTID (x ,y ) ((x).node == (y).node && (x).xid == (y).xid)
117117
118118typedef enum
119- {
119+ {
120120MSG_INVALID ,
121121MSG_HANDSHAKE ,
122122MSG_PREPARED ,
@@ -153,12 +153,12 @@ typedef enum
153153typedef struct
154154{
155155MtmMessageCode code ;/* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
156- int node ;/* Sender node ID */
156+ int node ;/* Sender node ID */
157157bool lockReq ;/* Whether sender node needs to lock cluster to let wal-sender caught-up and complete recovery */
158158bool locked ;/* Whether sender node is locked */
159159TransactionId dxid ;/* Transaction ID at destination node */
160- TransactionId sxid ;/* Transaction ID at sender node */
161- XidStatus status ;/* Transaction status */
160+ TransactionId sxid ;/* Transaction ID at sender node */
161+ XidStatus status ;/* Transaction status */
162162csn_t csn ;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
163163csn_t oldestSnapshot ;/* Oldest snapshot used by active transactions at this node */
164164nodemask_t disabledNodeMask ;/* Bitmask of disabled nodes at the sender of message */
@@ -185,13 +185,13 @@ typedef struct MtmMessageQueue
185185struct MtmMessageQueue * next ;
186186}MtmMessageQueue ;
187187
188- typedef struct
188+ typedef struct
189189{
190190MtmArbiterMessage hdr ;
191191char connStr [MULTIMASTER_MAX_CONN_STR_SIZE ];
192192}MtmHandshakeMessage ;
193193
194- typedef struct
194+ typedef struct
195195{
196196int used ;
197197int size ;
@@ -227,7 +227,7 @@ typedef struct
227227int senderPid ;
228228int receiverPid ;
229229lsn_t flushPos ;
230- csn_t oldestSnapshot ;/* Oldest snapshot used by active transactions at this node */
230+ csn_t oldestSnapshot ;/* Oldest snapshot used by active transactions at this node */
231231lsn_t restartLSN ;
232232RepOriginId originId ;
233233int timeline ;
@@ -246,12 +246,12 @@ typedef struct MtmL2List
246246typedef struct MtmTransState
247247{
248248TransactionId xid ;
249- XidStatus status ;
249+ XidStatus status ;
250250pgid_t gid ;/* Global transaction ID (used for 2PC) */
251251GlobalTransactionId gtid ;/* Transaction id at coordinator */
252252csn_t csn ;/* commit serial number */
253253csn_t snapshot ;/* transaction snapshot, or INVALID_CSN for local transactions */
254- int procno ;/* pgprocno of transaction coordinator waiting for responses from replicas,
254+ int procno ;/* pgprocno of transaction coordinator waiting for responses from replicas,
255255 used to notify coordinator by arbiter */
256256int nSubxids ;/* Number of subtransanctions */
257257struct MtmTransState * next ;/* Next element in L1 list of all finished transaction present in xid2state hash */
@@ -293,7 +293,7 @@ typedef struct
293293nodemask_t pglogicalSenderMask ;/* bitmask of started pglogic senders */
294294nodemask_t currentLockNodeMask ;/* Mask of nodes IDs which are locking the cluster */
295295nodemask_t inducedLockNodeMask ;/* Mask of node IDs which requested cluster-wide lock */
296- nodemask_t originLockNodeMask ;/* Mask of node IDs which WAL-senders are locking the cluster.
296+ nodemask_t originLockNodeMask ;/* Mask of node IDs which WAL-senders are locking the cluster.
297297* MtmNodeId bit is used by recovered node to complete recovery and by MtmLockCluster method */
298298nodemask_t reconnectMask ;/* Mask of nodes connection to which has to be reestablished by sender */
299299int lastLockHolder ;/* PID of process last obtaining the node lock */
@@ -319,13 +319,13 @@ typedef struct
319319MtmTransState * * transListTail ;/* Tail of L1 list of all finished transactions, used to append new elements.
320320 This list is expected to be in CSN ascending order, by strict order may be violated */
321321MtmL2List activeTransList ;/* List of active transactions */
322- ulong64 transCount ;/* Counter of transactions performed by this node */
322+ ulong64 transCount ;/* Counter of transactions performed by this node */
323323ulong64 gcCount ;/* Number of global transactions performed since last GC */
324324MtmMessageQueue * sendQueue ;/* Messages to be sent by arbiter sender */
325325MtmMessageQueue * freeQueue ;/* Free messages */
326326lsn_t recoveredLSN ;/* LSN at the moment of recovery completion */
327327BgwPool pool ;/* Pool of background workers for applying logical replication patches */
328- MtmNodeInfo nodes [1 ];/* [Mtm->nAllNodes]: per-node data */
328+ MtmNodeInfo nodes [1 ];/* [Mtm->nAllNodes]: per-node data */
329329}MtmState ;
330330
331331typedef struct MtmFlushPosition
@@ -342,7 +342,7 @@ typedef struct MtmSeqPosition
342342Oid seqid ;
343343int64 next ;
344344}MtmSeqPosition ;
345-
345+
346346#define MtmIsCoordinator (ts ) (ts->gtid.node == MtmNodeId)
347347
348348extern char const * const MtmNodeStatusMnem [];
@@ -393,14 +393,15 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
393393extern void MtmLock (LWLockMode mode );
394394extern void MtmUnlock (void );
395395extern void MtmLockNode (int nodeId ,LWLockMode mode );
396+ extern bool MtmTryLockNode (int nodeId ,LWLockMode mode );
396397extern void MtmUnlockNode (int nodeId );
397398extern void MtmStopNode (int nodeId ,bool dropSlot );
398399extern void MtmReconnectNode (int nodeId );
399400extern void MtmRecoverNode (int nodeId );
400401extern void MtmOnNodeDisconnect (int nodeId );
401402extern void MtmOnNodeConnect (int nodeId );
402403extern void MtmWakeUpBackend (MtmTransState * ts );
403- extern void MtmSleep (timestamp_t interval );
404+ extern void MtmSleep (timestamp_t interval );
404405extern void MtmAbortTransaction (MtmTransState * ts );
405406extern void MtmSetCurrentTransactionGID (char const * gid );
406407extern csn_t MtmGetTransactionCSN (TransactionId xid );