Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit83ea67b

Browse files
knizhnikkelvich
authored andcommitted
Add mmts control file
1 parent9cbd0fe commit83ea67b

File tree

3 files changed

+74
-8
lines changed

3 files changed

+74
-8
lines changed

‎multimaster.c

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ bool MtmVolksWagenMode;
213213
TransactionIdMtmUtilityProcessedInXid;
214214

215215
staticchar*MtmConnStrs;
216+
staticchar*MtmClusterName;
216217
staticintMtmQueueSize;
217218
staticintMtmWorkers;
218219
staticintMtmVacuumDelay;
@@ -1866,6 +1867,39 @@ static void MtmRaftableInitialize()
18661867
raftable_start(MtmNodeId-1);
18671868
}
18681869

1870+
staticvoidMtmCheckControlFile(void)
1871+
{
1872+
charcontrolFilePath[MAXPGPATH];
1873+
charbuf[MULTIMASTER_MAX_CTL_STR_SIZE];
1874+
FILE*f;
1875+
snprintf(controlFilePath,MAXPGPATH,"%s/global/mmts_control",DataDir);
1876+
f=fopen(controlFilePath,"r");
1877+
if (f!=NULL&&fgets(buf,sizeofbuf,f)) {
1878+
char*sep=strchr(buf,':');
1879+
if (sep==NULL) {
1880+
elog(FATAL,"File mmts_control doesn't contain cluster name");
1881+
}
1882+
*sep='\0';
1883+
if (strcmp(buf,MtmClusterName)!=0) {
1884+
elog(FATAL,"Database belongs to some other cluster %s rather than %s",buf,MtmClusterName);
1885+
}
1886+
if (sscanf(sep+1,"%d",&Mtm->donorNodeId)!=1) {
1887+
elog(FATAL,"File mmts_control doesn't contain node id");
1888+
}
1889+
fclose(f);
1890+
}else {
1891+
if (f!=NULL) {
1892+
fclose(f);
1893+
}
1894+
f=fopen(controlFilePath,"w");
1895+
if (f==NULL) {
1896+
elog(FATAL,"Failed to create mmts_control file: %m");
1897+
}
1898+
Mtm->donorNodeId=-1;
1899+
fprintf(f,"%s:%d\n",MtmClusterName,Mtm->donorNodeId);
1900+
fclose(f);
1901+
}
1902+
}
18691903

18701904
staticvoidMtmInitialize()
18711905
{
@@ -1930,6 +1964,8 @@ static void MtmInitialize()
19301964
MtmDoReplication= true;
19311965
TM=&MtmTM;
19321966
LWLockRelease(AddinShmemInitLock);
1967+
1968+
MtmCheckControlFile();
19331969
}
19341970

19351971
staticvoid
@@ -2471,6 +2507,19 @@ _PG_init(void)
24712507
NULL/* GucShowHook show_hook */
24722508
);
24732509

2510+
DefineCustomStringVariable(
2511+
"multimaster.cluster_name",
2512+
"Name of the cluster",
2513+
NULL,
2514+
&MtmClusterName,
2515+
"mmts",
2516+
PGC_BACKEND,/* context */
2517+
0,/* flags */
2518+
NULL,/* GucStringCheckHook check_hook */
2519+
NULL,/* GucStringAssignHook assign_hook */
2520+
NULL/* GucShowHook show_hook */
2521+
);
2522+
24742523
DefineCustomIntVariable(
24752524
"multimaster.node_id",
24762525
"Multimaster node ID",
@@ -2608,8 +2657,10 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
26082657
MtmLock(LW_EXCLUSIVE);
26092658
if (Mtm->status==MTM_RECOVERY) {
26102659
recovery= true;
2611-
if (Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId) {
2612-
/* Choose for recovery first available slot */
2660+
if ((Mtm->recoverySlot==0&& (Mtm->donorNodeId<0||Mtm->donorNodeId==nodeId))
2661+
||Mtm->recoverySlot==nodeId)
2662+
{
2663+
/* Choose for recovery first available slot or slot of donor node (if any) */
26132664
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
26142665
Mtm->recoverySlot=nodeId;
26152666
Mtm->nReceivers=0;
@@ -2697,6 +2748,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
26972748
{
26982749
ListCell*param;
26992750
boolrecoveryCompleted= false;
2751+
XLogRecPtrrecoveryStartPos=InvalidXLogRecPtr;
2752+
27002753
MtmIsRecoverySession= false;
27012754
Mtm->nodes[MtmReplicationNodeId-1].senderPid=MyProcPid;
27022755
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime=MtmGetSystemTime();
@@ -2716,11 +2769,21 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
27162769
elog(ERROR,"Replication mode is not specified");
27172770
}
27182771
break;
2772+
}elseif (strcmp("mtm_restart_pos",elem->defname)==0) {
2773+
if (elem->arg!=NULL&&strVal(elem->arg)!=NULL) {
2774+
recoveryStartPos=intVal(elem->arg);
2775+
}else {
2776+
elog(ERROR,"Restart position is not specified");
2777+
}
27192778
}
27202779
}
27212780
MtmLock(LW_EXCLUSIVE);
2722-
if (MtmIsRecoverySession) {
2723-
MTM_LOG1("%d: Node %d start recovery of node %d",MyProcPid,MtmNodeId,MtmReplicationNodeId);
2781+
if (MtmIsRecoverySession) {
2782+
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx",MyProcPid,MtmNodeId,MtmReplicationNodeId,recoveryStartPos);
2783+
Assert(MyReplicationSlot!=NULL);
2784+
if (recoveryStartPos<MyReplicationSlot->data.restart_lsn) {
2785+
elog(ERROR,"Specified recovery start position %lx is beyond restart lsn %lx",recoveryStartPos,MyReplicationSlot->data.restart_lsn);
2786+
}
27242787
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmReplicationNodeId-1)) {
27252788
MtmDisableNode(MtmReplicationNodeId);
27262789
MtmCheckQuorum();

‎multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
5353
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
55+
#defineMULTIMASTER_MAX_CTL_STR_SIZE 256
5556
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5657
#defineMULTIMASTER_ADMIN "mtm_admin"
5758

@@ -241,6 +242,7 @@ typedef struct
241242
intnActiveTransactions;/* Nunmber of active 2PC transactions */
242243
intnConfigChanges;/* Number of cluster configuration changes */
243244
intrecoveryCount;/* Number of completed recoveries */
245+
intdonorNodeId;/* Cluster node from which this node was populated */
244246
int64timeShift;/* Local time correction */
245247
csn_tcsn;/* Last obtained timestamp: used to provide unique acending CSNs based on system time */
246248
csn_tlastCsn;/* CSN of last committed transaction */

‎pglogical_receiver.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ pglogical_receiver_main(Datum main_arg)
285285
timeline=Mtm->nodes[nodeId-1].timeline;
286286
newTimeline= true;
287287
}
288-
/* My original assumption was that we can perfrom recovery onlyfromm existed slot,
288+
/* My original assumption was that we can perfrom recovery onlyfrom existed slot,
289289
* but unfortunately looks like slots can "disapear" together with WAL-sender.
290290
* So let's try to recreate slot always. */
291291
/* if (mode != REPLMODE_REPLICATION) */
@@ -325,7 +325,7 @@ pglogical_receiver_main(Datum main_arg)
325325
* Them are either empty, either new node is synchronized using base_backup.
326326
* So we assume that LSNs are the same for local and remote node
327327
*/
328-
originStartPos=Mtm->status==MTM_RECOVERY ?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
328+
originStartPos=Mtm->status==MTM_RECOVERY&&Mtm->donorNodeId==nodeId?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
329329
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
330330
}else {
331331
originStartPos=replorigin_get_progress(originId, false);
@@ -335,13 +335,14 @@ pglogical_receiver_main(Datum main_arg)
335335
CommitTransactionCommand();
336336
}
337337

338-
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s')",
338+
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx')",
339339
slotName,
340340
(uint32) (originStartPos >>32),
341341
(uint32)originStartPos,
342342
MULTIMASTER_MAX_PROTO_VERSION,
343343
MULTIMASTER_MIN_PROTO_VERSION,
344-
MtmReplicationModeName[mode]
344+
MtmReplicationModeName[mode],
345+
originStartPos
345346
);
346347
res=PQexec(conn,query->data);
347348
if (PQresultStatus(res)!=PGRES_COPY_BOTH)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp