Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit990ccfc

Browse files
knizhnikkelvich
authored andcommitted
More recovery fixes
1 parent839d890 commit990ccfc

File tree

6 files changed

+50
-36
lines changed

6 files changed

+50
-36
lines changed

‎arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ static void MtmTransReceiver(Datum arg)
10031003
}
10041004
}
10051005
}
1006-
if (Mtm->status!=MTM_RECOVERY) {
1006+
if (Mtm->status==MTM_ONLINE) {
10071007
now=MtmGetSystemTime();
10081008
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
10091009
if (!MtmWatchdog(stopPolling)) {

‎multimaster.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
#include"utils/builtins.h"
4141
#include"utils/memutils.h"
4242
#include"commands/dbcommands.h"
43-
#include"miscadmin.h"
4443
#include"postmaster/autovacuum.h"
4544
#include"storage/pmsignal.h"
4645
#include"storage/proc.h"
@@ -1214,12 +1213,16 @@ static void MtmEnableNode(int nodeId)
12141213

12151214
voidMtmRecoveryCompleted(void)
12161215
{
1216+
inti;
12171217
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, reconnect mask=%lx, live nodes=%d",
12181218
MtmNodeId,Mtm->disabledNodeMask,Mtm->reconnectMask,Mtm->nLiveNodes);
12191219
MtmLock(LW_EXCLUSIVE);
12201220
Mtm->recoverySlot=0;
12211221
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
12221222
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
1223+
for (i=0;i<Mtm->nAllNodes;i++) {
1224+
Mtm->nodes[i].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
1225+
}
12231226
/* Mode will be changed to online once all logical reciever are connected */
12241227
MtmSwitchClusterMode(MTM_CONNECTED);
12251228
MtmUnlock();
@@ -2297,7 +2300,7 @@ void MtmReceiverStarted(int nodeId)
22972300
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
22982301
* Slots at other nodes should be removed
22992302
*/
2300-
MtmSlotModeMtmReceiverSlotMode(intnodeId)
2303+
MtmReplicationModeMtmGetReplicationMode(intnodeId)
23012304
{
23022305
boolrecovery= false;
23032306
while (Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE) {
@@ -2312,7 +2315,7 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
23122315
Mtm->recoveryCount+=1;
23132316
Mtm->pglogicalNodeMask=0;
23142317
FinishAllPreparedTransactions(false);
2315-
returnSLOT_OPEN_EXISTED;
2318+
returnREPLMODE_RECOVERY;
23162319
}
23172320
}
23182321
/* delay opening of other slots until recovery is completed */
@@ -2324,7 +2327,7 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
23242327
MTM_LOG2("%d: Reuse replication slot for node %d",MyProcPid,nodeId);
23252328
}
23262329
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2327-
returnrecovery ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
2330+
returnrecovery ?REPLMODE_RECOVERED :REPLMODE_NORMAL;
23282331
}
23292332

23302333
staticboolMtmIsBroadcast()

‎multimaster.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,10 @@ typedef enum
117117

118118
typedefenum
119119
{
120-
SLOT_CREATE_NEW,/*create new slot (dropexisted) */
121-
SLOT_OPEN_EXISTED,/*open existedslot */
122-
SLOT_OPEN_ALWAYS,/*openexisted slot or create newif not exists */
123-
}MtmSlotMode;
120+
REPLMODE_RECOVERED,/*recovery of node is completed sodropold slot and restart replication from the current position in WAL */
121+
REPLMODE_RECOVERY,/*perform recorvery of the node by applying all data from theslot from specified point */
122+
REPLMODE_NORMAL/*normal mode: useexisted slot or create newone and start receiving data from it from the specified position */
123+
}MtmReplicationMode;
124124

125125
typedefstruct
126126
{
@@ -244,7 +244,7 @@ extern csn_t MtmAssignCSN(void);
244244
externcsn_tMtmSyncClock(csn_tcsn);
245245
externvoidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tsnapshot);
246246
externvoidMtmReceiverStarted(intnodeId);
247-
externMtmSlotModeMtmReceiverSlotMode(intnodeId);
247+
externMtmReplicationModeMtmGetReplicationMode(intnodeId);
248248
externvoidMtmExecute(void*work,intsize);
249249
externvoidMtmExecutor(intid,void*work,size_tsize);
250250
externvoidMtmSendNotificationMessage(MtmTransState*ts,MtmMessageCodecmd);

‎pglogical_output.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include"utils/relcache.h"
4848
#include"utils/syscache.h"
4949
#include"utils/typcache.h"
50+
#include"miscadmin.h"
5051

5152
externvoid_PG_output_plugin_init(OutputPluginCallbacks*cb);
5253

@@ -155,6 +156,8 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
155156
{
156157
PGLogicalOutputData*data=palloc0(sizeof(PGLogicalOutputData));
157158

159+
elog(LOG,"%d: pg_decode_startup is_init=%d",MyProcPid,is_init);
160+
158161
data->context=AllocSetContextCreate(TopMemoryContext,
159162
"pglogical conversion context",
160163
ALLOCSET_DEFAULT_MINSIZE,

‎pglogical_proto.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,7 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
433433
PGLogicalProtoAPI*
434434
pglogical_init_api(PGLogicalProtoTypetyp)
435435
{
436-
PGLogicalProtoAPI*res=malloc(sizeof(PGLogicalProtoAPI));
437-
MemSet(res,0,sizeof(PGLogicalProtoAPI));
436+
PGLogicalProtoAPI*res=palloc0(sizeof(PGLogicalProtoAPI));
438437
sscanf(MyReplicationSlot->data.name.data,MULTIMASTER_SLOT_PATTERN,&MtmReplicationNodeId);
439438
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d",MyProcPid,MyReplicationSlot->data.name.data,MtmReplicationNodeId);
440439
res->write_rel=pglogical_write_rel;

‎pglogical_receiver.c

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ feTimestampDifference(int64 start_time, int64 stop_time,
193193

194194
staticcharconst*constMtmReplicationModeName[]=
195195
{
196-
"recovered",/*SLOT_CREATE_NEW:recovery of node is completed so drop old slot and restart replication from the current position in WAL */
197-
"recovery",/*SLOT_OPEN_EXISTED:perform recorvery of the node by applying all data from theslot from specified point */
198-
"normal"/*SLOT_OPEN_ALWAYS:normal mode: use existed slot or create new one and start receiving data from it from the specified position */
196+
"recovered",/* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
197+
"recovery",/* perform recorvery of the node by applying all data from theslot from specified point */
198+
"normal"/* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
199199
};
200200

201201
staticvoid
@@ -206,7 +206,7 @@ pglogical_receiver_main(Datum main_arg)
206206
PQExpBufferquery;
207207
PGconn*conn;
208208
PGresult*res;
209-
MtmSlotModemode;
209+
MtmReplicationModemode;
210210

211211
ByteBufferbuf;
212212
XLogRecPtroriginStartPos=0;
@@ -251,7 +251,7 @@ pglogical_receiver_main(Datum main_arg)
251251
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
252252
* Slots at other nodes should be removed
253253
*/
254-
mode=MtmReceiverSlotMode(nodeId);
254+
mode=MtmGetReplicationMode(nodeId);
255255
count=Mtm->recoveryCount;
256256

257257
/* Establish connection to remote server */
@@ -264,14 +264,19 @@ pglogical_receiver_main(Datum main_arg)
264264
}
265265

266266
query=createPQExpBuffer();
267-
268-
if (mode==SLOT_CREATE_NEW) {
267+
#if1/* Do we need to recretate slot ? */
268+
if (mode==REPLMODE_RECOVERED) {/* recreate slot */
269269
appendPQExpBuffer(query,"DROP_REPLICATION_SLOT \"%s\"",slotName);
270270
res=PQexec(conn,query->data);
271271
PQclear(res);
272272
resetPQExpBuffer(query);
273273
}
274-
if (mode!=SLOT_OPEN_EXISTED) {
274+
#endif
275+
/* My original assumption was that we can perfrom recovery only fromm existed slot,
276+
* but unfortunately looks like slots can "disapear" together with WAL-sender.
277+
* So let's try to recreate slot always. */
278+
/* if (mode != REPLMODE_REPLICATION) */
279+
{
275280
appendPQExpBuffer(query,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",slotName,MULTIMASTER_NAME);
276281
res=PQexec(conn,query->data);
277282
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
@@ -291,24 +296,28 @@ pglogical_receiver_main(Datum main_arg)
291296
}
292297

293298
/* Start logical replication at specified position */
294-
StartTransactionCommand();
295-
originName=psprintf(MULTIMASTER_SLOT_PATTERN,nodeId);
296-
originId=replorigin_by_name(originName, true);
297-
if (originId==InvalidRepOriginId) {
298-
originId=replorigin_create(originName);
299-
/*
300-
* We are just creating new replication slot.
301-
* It is assumed that state of local and remote nodes is the same at this moment.
302-
* Them are either empty, either new node is synchronized using base_backup.
303-
* So we assume that LSNs are the same for local and remote node
304-
*/
305-
originStartPos=Mtm->status==MTM_RECOVERY ?GetXLogInsertRecPtr() :0;
306-
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
299+
if (mode==REPLMODE_RECOVERED) {
300+
originStartPos=0;
307301
}else {
308-
originStartPos=replorigin_get_progress(originId, false);
309-
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
302+
StartTransactionCommand();
303+
originName=psprintf(MULTIMASTER_SLOT_PATTERN,nodeId);
304+
originId=replorigin_by_name(originName, true);
305+
if (originId==InvalidRepOriginId) {
306+
originId=replorigin_create(originName);
307+
/*
308+
* We are just creating new replication slot.
309+
* It is assumed that state of local and remote nodes is the same at this moment.
310+
* Them are either empty, either new node is synchronized using base_backup.
311+
* So we assume that LSNs are the same for local and remote node
312+
*/
313+
originStartPos=Mtm->status==MTM_RECOVERY ?GetXLogInsertRecPtr() :0;
314+
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
315+
}else {
316+
originStartPos=replorigin_get_progress(originId, false);
317+
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
318+
}
319+
CommitTransactionCommand();
310320
}
311-
CommitTransactionCommand();
312321

313322
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s')",
314323
slotName,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp