Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit99e1722

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parents11d9458 +6ae8d72 commit99e1722

File tree

2 files changed

+59
-47
lines changed

2 files changed

+59
-47
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,11 @@ void MtmLockNode(int nodeId, LWLockMode mode)
370370
LWLockAcquire((LWLockId)&Mtm->locks[nodeId],mode);
371371
}
372372

373+
boolMtmTryLockNode(intnodeId,LWLockModemode)
374+
{
375+
returnLWLockConditionalAcquire((LWLockId)&Mtm->locks[nodeId],mode);
376+
}
377+
373378
voidMtmUnlockNode(intnodeId)
374379
{
375380
Assert(nodeId>0&&nodeId <=MtmMaxNodes*2);
@@ -1683,7 +1688,14 @@ static void MtmStartRecovery()
16831688

16841689
staticvoidMtmDropSlot(intnodeId)
16851690
{
1686-
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN,nodeId));
1691+
if (MtmTryLockNode(nodeId,LW_EXCLUSIVE))
1692+
{
1693+
MTM_ELOG(INFO,"Drop replication slot for node %d",nodeId);
1694+
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN,nodeId));
1695+
MtmUnlockNode(nodeId);
1696+
}else {
1697+
MTM_ELOG(WARNING,"Failed to drop replication slot for node %d",nodeId);
1698+
}
16871699
MtmLock(LW_EXCLUSIVE);
16881700
BIT_SET(Mtm->stalledNodeMask,nodeId-1);
16891701
BIT_SET(Mtm->stoppedNodeMask,nodeId-1);/* stalled node can not be automatically recovered */
@@ -2765,7 +2777,7 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
27652777
}else {
27662778
conn->arbiterPort=MULTIMASTER_DEFAULT_ARBITER_PORT;
27672779
}
2768-
MTM_ELOG(WARNING,"Using arbiter port: %d",conn->arbiterPort);
2780+
MTM_ELOG(INFO,"Using arbiter port: %d",conn->arbiterPort);
27692781

27702782
port=strstr(connStr," port=");
27712783
if (port==NULL&&strncmp(connStr,"port=",5)==0) {
@@ -3033,7 +3045,7 @@ _PG_init(void)
30333045
1,
30343046
INT_MAX,
30353047
PGC_BACKEND,
3036-
0,
3048+
GUC_NO_SHOW_ALL,
30373049
NULL,
30383050
NULL,
30393051
NULL
@@ -3077,7 +3089,7 @@ _PG_init(void)
30773089
1,
30783090
INT_MAX,
30793091
PGC_BACKEND,
3080-
0,
3092+
GUC_NO_SHOW_ALL,
30813093
NULL,
30823094
NULL,
30833095
NULL
@@ -3173,7 +3185,7 @@ _PG_init(void)
31733185
&MtmUseDtm,
31743186
true,
31753187
PGC_BACKEND,
3176-
0,
3188+
GUC_NO_SHOW_ALL,
31773189
NULL,
31783190
NULL,
31793191
NULL
@@ -3199,7 +3211,7 @@ _PG_init(void)
31993211
&MtmPreserveCommitOrder,
32003212
true,
32013213
PGC_BACKEND,
3202-
0,
3214+
GUC_NO_SHOW_ALL,
32033215
NULL,
32043216
NULL,
32053217
NULL
@@ -3212,7 +3224,7 @@ _PG_init(void)
32123224
&MtmVolksWagenMode,
32133225
false,
32143226
PGC_BACKEND,
3215-
0,
3227+
GUC_NO_SHOW_ALL,
32163228
NULL,
32173229
NULL,
32183230
NULL
@@ -3227,7 +3239,7 @@ _PG_init(void)
32273239
1,
32283240
INT_MAX,
32293241
PGC_BACKEND,
3230-
0,
3242+
GUC_NO_SHOW_ALL,
32313243
NULL,
32323244
NULL,
32333245
NULL
@@ -3257,7 +3269,7 @@ _PG_init(void)
32573269
1,
32583270
INT_MAX,
32593271
PGC_BACKEND,
3260-
0,
3272+
GUC_NO_SHOW_ALL,
32613273
NULL,
32623274
NULL,
32633275
NULL
@@ -3272,7 +3284,7 @@ _PG_init(void)
32723284
0,
32733285
INT_MAX,
32743286
PGC_BACKEND,
3275-
0,
3287+
GUC_NO_SHOW_ALL,
32763288
NULL,
32773289
NULL,
32783290
NULL
@@ -3288,7 +3300,7 @@ _PG_init(void)
32883300
1,
32893301
INT_MAX,
32903302
PGC_BACKEND,
3291-
0,
3303+
GUC_NO_SHOW_ALL,
32923304
NULL,
32933305
NULL,
32943306
NULL
@@ -3303,7 +3315,7 @@ _PG_init(void)
33033315
1024*1024,
33043316
INT_MAX,
33053317
PGC_BACKEND,
3306-
0,
3318+
GUC_NO_SHOW_ALL,
33073319
NULL,
33083320
NULL,
33093321
NULL
@@ -3408,7 +3420,7 @@ _PG_init(void)
34083420
PreviousProcessUtilityHook=ProcessUtility_hook;
34093421
ProcessUtility_hook=MtmProcessUtility;
34103422

3411-
PreviousSeqNextvalHook=SeqNextvalHook;
3423+
PreviousSeqNextvalHook=SeqNextvalHook;
34123424
SeqNextvalHook=MtmSeqNextvalHook;
34133425
}
34143426

@@ -5366,7 +5378,7 @@ static void MtmSeqNextvalHook(Oid seqid, int64 next)
53665378
MtmSeqPositionpos;
53675379
pos.seqid=seqid;
53685380
pos.next=next;
5369-
LogLogicalMessage("N", (char*)&pos,sizeof(pos), true);
5381+
LogLogicalMessage("N", (char*)&pos,sizeof(pos), true);
53705382
}
53715383
}
53725384

@@ -5553,4 +5565,3 @@ Datum mtm_referee_poll(PG_FUNCTION_ARGS)
55535565

55545566
PG_RETURN_INT64(recoveredNodeMask);
55555567
}
5556-

‎contrib/mmts/multimaster.h‎

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,25 @@
2323
#defineMTM_ERRMSG(fmt,...) errmsg(MTM_TAG fmt, ## __VA_ARGS__)
2424

2525
#ifDEBUG_LEVEL==0
26-
#defineMTM_LOG1(fmt, ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27-
#defineMTM_LOG2(fmt, ...)
28-
#define MTM_LOG3(fmt, ...)
29-
#defineMTM_LOG4(fmt, ...)
26+
#defineMTM_LOG1(fmt, ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27+
#defineMTM_LOG2(fmt, ...)
28+
#defineMTM_LOG3(fmt, ...)
29+
#defineMTM_LOG4(fmt, ...)
3030
#elifDEBUG_LEVEL==1
31-
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32-
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33-
#defineMTM_LOG3(fmt, ...)
34-
#define MTM_LOG4(fmt, ...)
31+
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32+
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33+
#defineMTM_LOG3(fmt, ...)
34+
#defineMTM_LOG4(fmt, ...)
3535
#elifDEBUG_LEVEL==2
36-
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37-
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38-
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39-
#defineMTM_LOG4(fmt, ...)
36+
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37+
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38+
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39+
#defineMTM_LOG4(fmt, ...)
4040
#elifDEBUG_LEVEL >=3
41-
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42-
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43-
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44-
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
41+
#defineMTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42+
#defineMTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43+
#defineMTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44+
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4545
#endif
4646

4747
#ifMTM_TRACE==0
@@ -98,7 +98,7 @@ typedef char pgid_t[MULTIMASTER_MAX_GID_SIZE];
9898
#defineSELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
9999

100100
typedefenum
101-
{
101+
{
102102
PGLOGICAL_COMMIT,
103103
PGLOGICAL_PREPARE,
104104
PGLOGICAL_COMMIT_PREPARED,
@@ -107,7 +107,7 @@ typedef enum
107107
}PGLOGICAL_EVENT;
108108

109109
/* Identifier of global transaction */
110-
typedefstruct
110+
typedefstruct
111111
{
112112
intnode;/* Zero based index of node initiating transaction */
113113
TransactionIdxid;/* Transaction ID at this node */
@@ -116,7 +116,7 @@ typedef struct
116116
#defineEQUAL_GTID(x,y) ((x).node == (y).node && (x).xid == (y).xid)
117117

118118
typedefenum
119-
{
119+
{
120120
MSG_INVALID,
121121
MSG_HANDSHAKE,
122122
MSG_PREPARED,
@@ -153,12 +153,12 @@ typedef enum
153153
typedefstruct
154154
{
155155
MtmMessageCodecode;/* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
156-
intnode;/* Sender node ID */
156+
intnode;/* Sender node ID */
157157
boollockReq;/* Whether sender node needs to lock cluster to let wal-sender caught-up and complete recovery */
158158
boollocked;/* Whether sender node is locked */
159159
TransactionIddxid;/* Transaction ID at destination node */
160-
TransactionIdsxid;/* Transaction ID at sender node */
161-
XidStatusstatus;/* Transaction status */
160+
TransactionIdsxid;/* Transaction ID at sender node */
161+
XidStatusstatus;/* Transaction status */
162162
csn_tcsn;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
163163
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
164164
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
@@ -185,13 +185,13 @@ typedef struct MtmMessageQueue
185185
structMtmMessageQueue*next;
186186
}MtmMessageQueue;
187187

188-
typedefstruct
188+
typedefstruct
189189
{
190190
MtmArbiterMessagehdr;
191191
charconnStr[MULTIMASTER_MAX_CONN_STR_SIZE];
192192
}MtmHandshakeMessage;
193193

194-
typedefstruct
194+
typedefstruct
195195
{
196196
intused;
197197
intsize;
@@ -227,7 +227,7 @@ typedef struct
227227
intsenderPid;
228228
intreceiverPid;
229229
lsn_tflushPos;
230-
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
230+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
231231
lsn_trestartLSN;
232232
RepOriginIdoriginId;
233233
inttimeline;
@@ -246,12 +246,12 @@ typedef struct MtmL2List
246246
typedefstructMtmTransState
247247
{
248248
TransactionIdxid;
249-
XidStatusstatus;
249+
XidStatusstatus;
250250
pgid_tgid;/* Global transaction ID (used for 2PC) */
251251
GlobalTransactionIdgtid;/* Transaction id at coordinator */
252252
csn_tcsn;/* commit serial number */
253253
csn_tsnapshot;/* transaction snapshot, or INVALID_CSN for local transactions */
254-
intprocno;/* pgprocno of transaction coordinator waiting for responses from replicas,
254+
intprocno;/* pgprocno of transaction coordinator waiting for responses from replicas,
255255
used to notify coordinator by arbiter */
256256
intnSubxids;/* Number of subtransanctions */
257257
structMtmTransState*next;/* Next element in L1 list of all finished transaction present in xid2state hash */
@@ -293,7 +293,7 @@ typedef struct
293293
nodemask_tpglogicalSenderMask;/* bitmask of started pglogic senders */
294294
nodemask_tcurrentLockNodeMask;/* Mask of nodes IDs which are locking the cluster */
295295
nodemask_tinducedLockNodeMask;/* Mask of node IDs which requested cluster-wide lock */
296-
nodemask_toriginLockNodeMask;/* Mask of node IDs which WAL-senders are locking the cluster.
296+
nodemask_toriginLockNodeMask;/* Mask of node IDs which WAL-senders are locking the cluster.
297297
* MtmNodeId bit is used by recovered node to complete recovery and by MtmLockCluster method */
298298
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
299299
intlastLockHolder;/* PID of process last obtaining the node lock */
@@ -319,13 +319,13 @@ typedef struct
319319
MtmTransState**transListTail;/* Tail of L1 list of all finished transactions, used to append new elements.
320320
This list is expected to be in CSN ascending order, by strict order may be violated */
321321
MtmL2ListactiveTransList;/* List of active transactions */
322-
ulong64transCount;/* Counter of transactions performed by this node */
322+
ulong64transCount;/* Counter of transactions performed by this node */
323323
ulong64gcCount;/* Number of global transactions performed since last GC */
324324
MtmMessageQueue*sendQueue;/* Messages to be sent by arbiter sender */
325325
MtmMessageQueue*freeQueue;/* Free messages */
326326
lsn_trecoveredLSN;/* LSN at the moment of recovery completion */
327327
BgwPoolpool;/* Pool of background workers for applying logical replication patches */
328-
MtmNodeInfonodes[1];/* [Mtm->nAllNodes]: per-node data */
328+
MtmNodeInfonodes[1];/* [Mtm->nAllNodes]: per-node data */
329329
}MtmState;
330330

331331
typedefstructMtmFlushPosition
@@ -342,7 +342,7 @@ typedef struct MtmSeqPosition
342342
Oidseqid;
343343
int64next;
344344
}MtmSeqPosition;
345-
345+
346346
#defineMtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
347347

348348
externcharconst*constMtmNodeStatusMnem[];
@@ -393,14 +393,15 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
393393
externvoidMtmLock(LWLockModemode);
394394
externvoidMtmUnlock(void);
395395
externvoidMtmLockNode(intnodeId,LWLockModemode);
396+
externboolMtmTryLockNode(intnodeId,LWLockModemode);
396397
externvoidMtmUnlockNode(intnodeId);
397398
externvoidMtmStopNode(intnodeId,booldropSlot);
398399
externvoidMtmReconnectNode(intnodeId);
399400
externvoidMtmRecoverNode(intnodeId);
400401
externvoidMtmOnNodeDisconnect(intnodeId);
401402
externvoidMtmOnNodeConnect(intnodeId);
402403
externvoidMtmWakeUpBackend(MtmTransState*ts);
403-
externvoidMtmSleep(timestamp_tinterval);
404+
externvoidMtmSleep(timestamp_tinterval);
404405
externvoidMtmAbortTransaction(MtmTransState*ts);
405406
externvoidMtmSetCurrentTransactionGID(charconst*gid);
406407
externcsn_tMtmGetTransactionCSN(TransactionIdxid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp