Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit76741a1

Browse files
knizhnikkelvich
authored andcommitted
Support monotonic sequences
1 parent181d689 commit76741a1

File tree

4 files changed

+64
-0
lines changed

4 files changed

+64
-0
lines changed

‎multimaster.c‎

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include"utils/memutils.h"
4848
#include"commands/dbcommands.h"
4949
#include"commands/extension.h"
50+
#include"commands/sequence.h"
5051
#include"postmaster/autovacuum.h"
5152
#include"storage/pmsignal.h"
5253
#include"storage/proc.h"
@@ -267,11 +268,13 @@ static bool MtmBreakConnection;
267268
staticboolMtmClusterLocked;
268269
staticboolMtmInsideTransaction;
269270
staticboolMtmReferee;
271+
staticboolMtmMonotonicSequences;
270272

271273
staticExecutorStart_hook_typePreviousExecutorStartHook;
272274
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
273275
staticProcessUtility_hook_typePreviousProcessUtilityHook;
274276
staticshmem_startup_hook_typePreviousShmemStartupHook;
277+
staticseq_nextval_hook_tPreviousSeqNextvalHook;
275278

276279
staticnodemask_tlastKnownMatrix[MAX_NODES];
277280

@@ -280,6 +283,7 @@ static void MtmExecutorFinish(QueryDesc *queryDesc);
280283
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
281284
ProcessUtilityContextcontext,ParamListInfoparams,
282285
DestReceiver*dest,char*completionTag);
286+
staticvoidMtmSeqNextvalHook(Oidseqid,int64next);
283287

284288
staticboolMtmAtExitHookRegistered= false;
285289

@@ -3137,6 +3141,19 @@ _PG_init(void)
31373141
NULL
31383142
);
31393143

3144+
DefineCustomBoolVariable(
3145+
"multimaster.monotonic_sequences",
3146+
"Enforce monotinic behaviour of sequence values obtained from different nodes",
3147+
NULL,
3148+
&MtmMonotonicSequences,
3149+
false,
3150+
PGC_BACKEND,
3151+
0,
3152+
NULL,
3153+
NULL,
3154+
NULL
3155+
);
3156+
31403157
DefineCustomBoolVariable(
31413158
"multimaster.ignore_tables_without_pk",
31423159
"Do not replicate tables without primary key",
@@ -3404,6 +3421,9 @@ _PG_init(void)
34043421

34053422
PreviousProcessUtilityHook=ProcessUtility_hook;
34063423
ProcessUtility_hook=MtmProcessUtility;
3424+
3425+
PreviousSeqNextvalHook=SeqNextvalHook;
3426+
SeqNextvalHook=MtmSeqNextvalHook;
34073427
}
34083428

34093429
/*
@@ -3415,6 +3435,7 @@ _PG_fini(void)
34153435
shmem_startup_hook=PreviousShmemStartupHook;
34163436
ExecutorFinish_hook=PreviousExecutorFinishHook;
34173437
ProcessUtility_hook=PreviousProcessUtilityHook;
3438+
SeqNextvalHook=PreviousSeqNextvalHook;
34183439
}
34193440

34203441

@@ -5352,6 +5373,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
53525373
}
53535374
}
53545375

5376+
staticvoidMtmSeqNextvalHook(Oidseqid,int64next)
5377+
{
5378+
if (MtmMonotonicSequences)
5379+
{
5380+
MtmSeqPositionpos;
5381+
pos.seqid=seqid;
5382+
pos.next=next;
5383+
LogLogicalMessage("N", (char*)&pos,sizeof(pos), true);
5384+
}
5385+
}
5386+
53555387
/*
53565388
* -------------------------------------------
53575389
* Executor pool interface

‎multimaster.h‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,12 @@ typedef struct MtmFlushPosition
337337
}MtmFlushPosition;
338338

339339

340+
typedefstructMtmSeqPosition
341+
{
342+
Oidseqid;
343+
int64next;
344+
}MtmSeqPosition;
345+
340346
#defineMtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
341347

342348
externcharconst*constMtmNodeStatusMnem[];

‎pglogical_apply.c‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include"commands/vacuum.h"
2323
#include"commands/tablespace.h"
2424
#include"commands/defrem.h"
25+
#include"commands/sequence.h"
2526
#include"parser/parse_utilcmd.h"
2627

2728
#include"libpq/pqformat.h"
@@ -1157,7 +1158,18 @@ void MtmExecutor(void* work, size_t size)
11571158
s.len=save_len;
11581159
break;
11591160
}
1161+
case'N':
1162+
{
1163+
int64next;
1164+
Assert(rel!=NULL);
1165+
next=pq_getmsgint64(&s);
1166+
AdjustSequence(RelationGetRelid(rel),next);
1167+
close_rel(rel);
1168+
rel=NULL;
1169+
break;
1170+
}
11601171
case'0':
1172+
Assert(rel!=NULL);
11611173
heap_truncate_one_rel(rel);
11621174
break;
11631175
case'M':

‎pglogical_proto.c‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,17 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
168168
}
169169
}
170170

171+
172+
staticvoidpglogical_seq_nextval(StringInfoout,LogicalDecodingContext*ctx,MtmSeqPosition*pos)
173+
{
174+
Relationrel=heap_open(pos->seqid,NoLock);
175+
pglogical_write_rel(out,ctx->output_plugin_private,rel);
176+
heap_close(rel,NoLock);
177+
pq_sendbyte(out,'N');
178+
pq_sendint64(out,pos->next);
179+
}
180+
181+
171182
staticvoidpglogical_broadcast_table(StringInfoout,LogicalDecodingContext*ctx,MtmCopyRequest*copy)
172183
{
173184
if (BIT_CHECK(copy->targetNodes,MtmReplicationNodeId-1)) {
@@ -229,6 +240,9 @@ pglogical_write_message(StringInfo out, LogicalDecodingContext *ctx,
229240
case'B':
230241
pglogical_broadcast_table(out,ctx, (MtmCopyRequest*)message);
231242
return;
243+
case'N':
244+
pglogical_seq_nextval(out,ctx, (MtmSeqPosition*)message);
245+
return;
232246
}
233247
pq_sendbyte(out,'M');
234248
pq_sendbyte(out,*prefix);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp