Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1b8929d

Browse files
committed
Add mmts control file
1 parent46f63ec commit1b8929d

File tree

3 files changed

+74
-8
lines changed

3 files changed

+74
-8
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ bool MtmVolksWagenMode;
214214
TransactionIdMtmUtilityProcessedInXid;
215215

216216
staticchar*MtmConnStrs;
217+
staticchar*MtmClusterName;
217218
staticintMtmQueueSize;
218219
staticintMtmWorkers;
219220
staticintMtmVacuumDelay;
@@ -1867,6 +1868,39 @@ static void MtmRaftableInitialize()
18671868
raftable_start(MtmNodeId-1);
18681869
}
18691870

1871+
staticvoidMtmCheckControlFile(void)
1872+
{
1873+
charcontrolFilePath[MAXPGPATH];
1874+
charbuf[MULTIMASTER_MAX_CTL_STR_SIZE];
1875+
FILE*f;
1876+
snprintf(controlFilePath,MAXPGPATH,"%s/global/mmts_control",DataDir);
1877+
f=fopen(controlFilePath,"r");
1878+
if (f!=NULL&&fgets(buf,sizeofbuf,f)) {
1879+
char*sep=strchr(buf,':');
1880+
if (sep==NULL) {
1881+
elog(FATAL,"File mmts_control doesn't contain cluster name");
1882+
}
1883+
*sep='\0';
1884+
if (strcmp(buf,MtmClusterName)!=0) {
1885+
elog(FATAL,"Database belongs to some other cluster %s rather than %s",buf,MtmClusterName);
1886+
}
1887+
if (sscanf(sep+1,"%d",&Mtm->donorNodeId)!=1) {
1888+
elog(FATAL,"File mmts_control doesn't contain node id");
1889+
}
1890+
fclose(f);
1891+
}else {
1892+
if (f!=NULL) {
1893+
fclose(f);
1894+
}
1895+
f=fopen(controlFilePath,"w");
1896+
if (f==NULL) {
1897+
elog(FATAL,"Failed to create mmts_control file: %m");
1898+
}
1899+
Mtm->donorNodeId=-1;
1900+
fprintf(f,"%s:%d\n",MtmClusterName,Mtm->donorNodeId);
1901+
fclose(f);
1902+
}
1903+
}
18701904

18711905
staticvoidMtmInitialize()
18721906
{
@@ -1931,6 +1965,8 @@ static void MtmInitialize()
19311965
MtmDoReplication= true;
19321966
TM=&MtmTM;
19331967
LWLockRelease(AddinShmemInitLock);
1968+
1969+
MtmCheckControlFile();
19341970
}
19351971

19361972
staticvoid
@@ -2472,6 +2508,19 @@ _PG_init(void)
24722508
NULL/* GucShowHook show_hook */
24732509
);
24742510

2511+
DefineCustomStringVariable(
2512+
"multimaster.cluster_name",
2513+
"Name of the cluster",
2514+
NULL,
2515+
&MtmClusterName,
2516+
"mmts",
2517+
PGC_BACKEND,/* context */
2518+
0,/* flags */
2519+
NULL,/* GucStringCheckHook check_hook */
2520+
NULL,/* GucStringAssignHook assign_hook */
2521+
NULL/* GucShowHook show_hook */
2522+
);
2523+
24752524
DefineCustomIntVariable(
24762525
"multimaster.node_id",
24772526
"Multimaster node ID",
@@ -2609,8 +2658,10 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
26092658
MtmLock(LW_EXCLUSIVE);
26102659
if (Mtm->status==MTM_RECOVERY) {
26112660
recovery= true;
2612-
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId) {
2613-
/* Choose for recovery first available slot */
2661+
if ((Mtm->recoverySlot==0&& (Mtm->donorNodeId<0||Mtm->donorNodeId==nodeId))
2662+
||Mtm->recoverySlot==nodeId)
2663+
{
2664+
/* Choose for recovery first available slot or slot of donor node (if any) */
26142665
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
26152666
Mtm->recoverySlot=nodeId;
26162667
Mtm->nReceivers=0;
@@ -2698,6 +2749,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
26982749
{
26992750
ListCell*param;
27002751
boolrecoveryCompleted= false;
2752+
XLogRecPtrrecoveryStartPos=InvalidXLogRecPtr;
2753+
27012754
MtmIsRecoverySession= false;
27022755
Mtm->nodes[MtmReplicationNodeId-1].senderPid=MyProcPid;
27032756
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime=MtmGetSystemTime();
@@ -2717,11 +2770,21 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
27172770
elog(ERROR,"Replication mode is not specified");
27182771
}
27192772
break;
2773+
}elseif (strcmp("mtm_restart_pos",elem->defname)==0) {
2774+
if (elem->arg!=NULL&&strVal(elem->arg)!=NULL) {
2775+
recoveryStartPos=intVal(elem->arg);
2776+
}else {
2777+
elog(ERROR,"Restart position is not specified");
2778+
}
27202779
}
27212780
}
27222781
MtmLock(LW_EXCLUSIVE);
2723-
if (MtmIsRecoverySession) {
2724-
MTM_LOG1("%d: Node %d start recovery of node %d",MyProcPid,MtmNodeId,MtmReplicationNodeId);
2782+
if (MtmIsRecoverySession) {
2783+
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx",MyProcPid,MtmNodeId,MtmReplicationNodeId,recoveryStartPos);
2784+
Assert(MyReplicationSlot!=NULL);
2785+
if (recoveryStartPos<MyReplicationSlot->data.restart_lsn) {
2786+
elog(ERROR,"Specified recovery start position %lx is beyond restart lsn %lx",recoveryStartPos,MyReplicationSlot->data.restart_lsn);
2787+
}
27252788
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
27262789
MtmDisableNode(MtmReplicationNodeId);
27272790
MtmCheckQuorum();

‎contrib/mmts/multimaster.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
5353
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
55+
#defineMULTIMASTER_MAX_CTL_STR_SIZE 256
5556
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5657
#defineMULTIMASTER_ADMIN "mtm_admin"
5758

@@ -241,6 +242,7 @@ typedef struct
241242
intnActiveTransactions;/* Nunmber of active 2PC transactions */
242243
intnConfigChanges;/* Number of cluster configuration changes */
243244
intrecoveryCount;/* Number of completed recoveries */
245+
intdonorNodeId;/* Cluster node from which this node was populated */
244246
int64timeShift;/* Local time correction */
245247
csn_tcsn;/* Last obtained timestamp: used to provide unique acending CSNs based on system time */
246248
csn_tlastCsn;/* CSN of last committed transaction */

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ pglogical_receiver_main(Datum main_arg)
285285
timeline=Mtm->nodes[nodeId-1].timeline;
286286
newTimeline= true;
287287
}
288-
/* My original assumption was that we can perfrom recovery onlyfromm existed slot,
288+
/* My original assumption was that we can perfrom recovery onlyfrom existed slot,
289289
* but unfortunately looks like slots can "disapear" together with WAL-sender.
290290
* So let's try to recreate slot always. */
291291
/* if (mode != REPLMODE_REPLICATION) */
@@ -325,7 +325,7 @@ pglogical_receiver_main(Datum main_arg)
325325
* Them are either empty, either new node is synchronized using base_backup.
326326
* So we assume that LSNs are the same for local and remote node
327327
*/
328-
originStartPos=Mtm->status==MTM_RECOVERY ?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
328+
originStartPos=Mtm->status==MTM_RECOVERY&&Mtm->donorNodeId==nodeId?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
329329
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
330330
}else {
331331
originStartPos=replorigin_get_progress(originId, false);
@@ -335,13 +335,14 @@ pglogical_receiver_main(Datum main_arg)
335335
CommitTransactionCommand();
336336
}
337337

338-
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s')",
338+
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx')",
339339
slotName,
340340
(uint32) (originStartPos >>32),
341341
(uint32)originStartPos,
342342
MULTIMASTER_MAX_PROTO_VERSION,
343343
MULTIMASTER_MIN_PROTO_VERSION,
344-
MtmReplicationModeName[mode]
344+
MtmReplicationModeName[mode],
345+
originStartPos
345346
);
346347
res=PQexec(conn,query->data);
347348
if (PQresultStatus(res)!=PGRES_COPY_BOTH)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp