Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit74f28a2

Browse files
knizhnikkelvich
authored andcommitted
Perform drop node under lock
1 parentbbfb53b commit74f28a2

File tree

2 files changed

+49
-37
lines changed

2 files changed

+49
-37
lines changed

‎multimaster.c

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@ void MtmLockNode(int nodeId, LWLockMode mode)
371371
LWLockAcquire((LWLockId)&Mtm->locks[nodeId],mode);
372372
}
373373

374+
boolMtmTryLockNode(intnodeId,LWLockModemode)
375+
{
376+
returnLWLockConditionalAcquire((LWLockId)&Mtm->locks[nodeId],mode);
377+
}
378+
374379
voidMtmUnlockNode(intnodeId)
375380
{
376381
Assert(nodeId>0&&nodeId <=MtmMaxNodes*2);
@@ -1684,7 +1689,14 @@ static void MtmStartRecovery()
16841689

16851690
staticvoidMtmDropSlot(intnodeId)
16861691
{
1687-
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN,nodeId));
1692+
if (MtmTryLockNode(nodeId,LW_EXCLUSIVE))
1693+
{
1694+
MTM_ELOG(INFO,"Drop replication slot for node %d",nodeId);
1695+
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN,nodeId));
1696+
MtmUnlockNode(nodeId);
1697+
}else {
1698+
MTM_ELOG(WARNING,"Failed to drop replication slot for node %d",nodeId);
1699+
}
16881700
MtmLock(LW_EXCLUSIVE);
16891701
BIT_SET(Mtm->stalledNodeMask,nodeId-1);
16901702
BIT_SET(Mtm->stoppedNodeMask,nodeId-1);/* stalled node can not be automatically recovered */
@@ -2766,7 +2778,7 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
27662778
}else {
27672779
conn->arbiterPort=MULTIMASTER_DEFAULT_ARBITER_PORT;
27682780
}
2769-
MTM_ELOG(WARNING,"Using arbiter port: %d",conn->arbiterPort);
2781+
MTM_ELOG(INFO,"Using arbiter port: %d",conn->arbiterPort);
27702782

27712783
port=strstr(connStr," port=");
27722784
if (port==NULL&&strncmp(connStr,"port=",5)==0) {
@@ -3422,7 +3434,7 @@ _PG_init(void)
34223434
PreviousProcessUtilityHook=ProcessUtility_hook;
34233435
ProcessUtility_hook=MtmProcessUtility;
34243436

3425-
PreviousSeqNextvalHook=SeqNextvalHook;
3437+
PreviousSeqNextvalHook=SeqNextvalHook;
34263438
SeqNextvalHook=MtmSeqNextvalHook;
34273439
}
34283440

@@ -5380,7 +5392,7 @@ static void MtmSeqNextvalHook(Oid seqid, int64 next)
53805392
MtmSeqPositionpos;
53815393
pos.seqid=seqid;
53825394
pos.next=next;
5383-
LogLogicalMessage("N", (char*)&pos,sizeof(pos), true);
5395+
LogLogicalMessage("N", (char*)&pos,sizeof(pos), true);
53845396
}
53855397
}
53865398

@@ -5567,4 +5579,3 @@ Datum mtm_referee_poll(PG_FUNCTION_ARGS)
55675579

55685580
PG_RETURN_INT64(recoveredNodeMask);
55695581
}
5570-

‎multimaster.h

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,25 @@
2323
#defineMTM_ERRMSG(fmt,...) errmsg(MTM_TAG fmt, ## __VA_ARGS__)
2424

2525
#ifDEBUG_LEVEL==0
26-
#defineMTM_LOG1(fmt, ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27-
#defineMTM_LOG2(fmt, ...)
28-
#define MTM_LOG3(fmt, ...)
29-
#defineMTM_LOG4(fmt, ...)
26+
#defineMTM_LOG1(fmt, ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27+
#defineMTM_LOG2(fmt, ...)
28+
#defineMTM_LOG3(fmt, ...)
29+
#defineMTM_LOG4(fmt, ...)
3030
#elifDEBUG_LEVEL==1
31-
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32-
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33-
#defineMTM_LOG3(fmt, ...)
34-
#define MTM_LOG4(fmt, ...)
31+
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32+
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33+
#defineMTM_LOG3(fmt, ...)
34+
#defineMTM_LOG4(fmt, ...)
3535
#elifDEBUG_LEVEL==2
36-
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37-
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38-
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39-
#defineMTM_LOG4(fmt, ...)
36+
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37+
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38+
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39+
#defineMTM_LOG4(fmt, ...)
4040
#elifDEBUG_LEVEL >=3
41-
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42-
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43-
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44-
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
41+
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42+
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43+
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44+
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4545
#endif
4646

4747
#ifMTM_TRACE==0
@@ -98,7 +98,7 @@ typedef char pgid_t[MULTIMASTER_MAX_GID_SIZE];
9898
#defineSELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
9999

100100
typedefenum
101-
{
101+
{
102102
PGLOGICAL_COMMIT,
103103
PGLOGICAL_PREPARE,
104104
PGLOGICAL_COMMIT_PREPARED,
@@ -107,7 +107,7 @@ typedef enum
107107
}PGLOGICAL_EVENT;
108108

109109
/* Identifier of global transaction */
110-
typedefstruct
110+
typedefstruct
111111
{
112112
intnode;/* Zero based index of node initiating transaction */
113113
TransactionIdxid;/* Transaction ID at this node */
@@ -116,7 +116,7 @@ typedef struct
116116
#defineEQUAL_GTID(x,y) ((x).node == (y).node && (x).xid == (y).xid)
117117

118118
typedefenum
119-
{
119+
{
120120
MSG_INVALID,
121121
MSG_HANDSHAKE,
122122
MSG_PREPARED,
@@ -153,12 +153,12 @@ typedef enum
153153
typedefstruct
154154
{
155155
MtmMessageCodecode;/* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
156-
intnode;/* Sender node ID */
156+
intnode;/* Sender node ID */
157157
boollockReq;/* Whether sender node needs to lock cluster to let wal-sender caught-up and complete recovery */
158158
boollocked;/* Whether sender node is locked */
159159
TransactionIddxid;/* Transaction ID at destination node */
160-
TransactionIdsxid;/* Transaction ID at sender node */
161-
XidStatusstatus;/* Transaction status */
160+
TransactionIdsxid;/* Transaction ID at sender node */
161+
XidStatusstatus;/* Transaction status */
162162
csn_tcsn;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
163163
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
164164
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
@@ -185,13 +185,13 @@ typedef struct MtmMessageQueue
185185
structMtmMessageQueue*next;
186186
}MtmMessageQueue;
187187

188-
typedefstruct
188+
typedefstruct
189189
{
190190
MtmArbiterMessagehdr;
191191
charconnStr[MULTIMASTER_MAX_CONN_STR_SIZE];
192192
}MtmHandshakeMessage;
193193

194-
typedefstruct
194+
typedefstruct
195195
{
196196
intused;
197197
intsize;
@@ -227,7 +227,7 @@ typedef struct
227227
intsenderPid;
228228
intreceiverPid;
229229
lsn_tflushPos;
230-
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
230+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
231231
lsn_trestartLSN;
232232
RepOriginIdoriginId;
233233
inttimeline;
@@ -246,12 +246,12 @@ typedef struct MtmL2List
246246
typedefstructMtmTransState
247247
{
248248
TransactionIdxid;
249-
XidStatusstatus;
249+
XidStatusstatus;
250250
pgid_tgid;/* Global transaction ID (used for 2PC) */
251251
GlobalTransactionIdgtid;/* Transaction id at coordinator */
252252
csn_tcsn;/* commit serial number */
253253
csn_tsnapshot;/* transaction snapshot, or INVALID_CSN for local transactions */
254-
intprocno;/* pgprocno of transaction coordinator waiting for responses from replicas,
254+
intprocno;/* pgprocno of transaction coordinator waiting for responses from replicas,
255255
used to notify coordinator by arbiter */
256256
intnSubxids;/* Number of subtransanctions */
257257
structMtmTransState*next;/* Next element in L1 list of all finished transaction present in xid2state hash */
@@ -293,7 +293,7 @@ typedef struct
293293
nodemask_tpglogicalSenderMask;/* bitmask of started pglogic senders */
294294
nodemask_tcurrentLockNodeMask;/* Mask of nodes IDs which are locking the cluster */
295295
nodemask_tinducedLockNodeMask;/* Mask of node IDs which requested cluster-wide lock */
296-
nodemask_toriginLockNodeMask;/* Mask of node IDs which WAL-senders are locking the cluster.
296+
nodemask_toriginLockNodeMask;/* Mask of node IDs which WAL-senders are locking the cluster.
297297
* MtmNodeId bit is used by recovered node to complete recovery and by MtmLockCluster method */
298298
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
299299
intlastLockHolder;/* PID of process last obtaining the node lock */
@@ -319,13 +319,13 @@ typedef struct
319319
MtmTransState**transListTail;/* Tail of L1 list of all finished transactions, used to append new elements.
320320
This list is expected to be in CSN ascending order, by strict order may be violated */
321321
MtmL2ListactiveTransList;/* List of active transactions */
322-
ulong64transCount;/* Counter of transactions performed by this node */
322+
ulong64transCount;/* Counter of transactions performed by this node */
323323
ulong64gcCount;/* Number of global transactions performed since last GC */
324324
MtmMessageQueue*sendQueue;/* Messages to be sent by arbiter sender */
325325
MtmMessageQueue*freeQueue;/* Free messages */
326326
lsn_trecoveredLSN;/* LSN at the moment of recovery completion */
327327
BgwPoolpool;/* Pool of background workers for applying logical replication patches */
328-
MtmNodeInfonodes[1];/* [Mtm->nAllNodes]: per-node data */
328+
MtmNodeInfonodes[1];/* [Mtm->nAllNodes]: per-node data */
329329
}MtmState;
330330

331331
typedefstructMtmFlushPosition
@@ -342,7 +342,7 @@ typedef struct MtmSeqPosition
342342
Oidseqid;
343343
int64next;
344344
}MtmSeqPosition;
345-
345+
346346
#defineMtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
347347

348348
externcharconst*constMtmNodeStatusMnem[];
@@ -394,14 +394,15 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
394394
externvoidMtmLock(LWLockModemode);
395395
externvoidMtmUnlock(void);
396396
externvoidMtmLockNode(intnodeId,LWLockModemode);
397+
externboolMtmTryLockNode(intnodeId,LWLockModemode);
397398
externvoidMtmUnlockNode(intnodeId);
398399
externvoidMtmStopNode(intnodeId,booldropSlot);
399400
externvoidMtmReconnectNode(intnodeId);
400401
externvoidMtmRecoverNode(intnodeId);
401402
externvoidMtmOnNodeDisconnect(intnodeId);
402403
externvoidMtmOnNodeConnect(intnodeId);
403404
externvoidMtmWakeUpBackend(MtmTransState*ts);
404-
externvoidMtmSleep(timestamp_tinterval);
405+
externvoidMtmSleep(timestamp_tinterval);
405406
externvoidMtmAbortTransaction(MtmTransState*ts);
406407
externvoidMtmSetCurrentTransactionGID(charconst*gid);
407408
externcsn_tMtmGetTransactionCSN(TransactionIdxid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp