Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb98bc97

Browse files
committed
Uncompleted attempt to merge with pglogical plugin
1 parentb7d599e commitb98bc97

9 files changed

+701
-64
lines changed

‎contrib/mmts/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.opglogical_relmetacache.oddd.o bkb.o
33

44
overrideCPPFLAGS += -I../raftable
55

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,18 +577,44 @@ process_remote_commit(StringInfo in)
577577
}
578578

579579
staticvoid
580-
process_remote_insert(StringInfos,Relationrel)
580+
process_remote_insert(StringInfos)
581581
{
582582
EState*estate;
583583
TupleDatanew_tuple;
584584
TupleTableSlot*newslot;
585585
TupleTableSlot*oldslot;
586586
ResultRelInfo*relinfo;
587587
ScanKey*index_keys;
588+
uint32relid;
589+
uint8flags;
590+
PGLogicalRelation*lr;
591+
Relationrel;
588592
char*relname=RelationGetRelationName(rel);
589593
inti;
590594

595+
/* read the flags */
596+
flags=pq_getmsgbyte(in);
597+
Assert(flags==0);
598+
599+
/* read the relation id */
600+
relid=pq_getmsgint(in,4);
601+
602+
action=pq_getmsgbyte(in);
603+
if (action!='N')
604+
elog(ERROR,"expected new tuple but got %d",
605+
action);
606+
607+
rl=pglogical_relation_open(relid,RowExclusiveLock);
608+
rel=rl->rel;
609+
591610
estate=create_rel_estate(rel);
611+
econtext=GetPerTupleExprContext(estate);
612+
613+
PushActiveSnapshot(GetTransactionSnapshot());
614+
615+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
616+
fill_tuple_defaults(rel,econtext,&new_tup);
617+
592618
newslot=ExecInitExtraTupleSlot(estate);
593619
oldslot=ExecInitExtraTupleSlot(estate);
594620
ExecSetSlotDescriptor(newslot,RelationGetDescr(rel));
@@ -664,6 +690,7 @@ process_remote_insert(StringInfo s, Relation rel)
664690
ExecCloseIndices(estate->es_result_relation_info);
665691

666692
heap_close(rel,NoLock);
693+
pglogical_relation_close(rl,NoLock);
667694
ExecResetTupleTable(estate->es_tupleTable, true);
668695
FreeExecutorState(estate);
669696

‎contrib/mmts/pglogical_output.c‎

Lines changed: 89 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
*/
1313
#include"postgres.h"
1414

15+
#include"pglogical_output/compat.h"
1516
#include"pglogical_config.h"
1617
#include"pglogical_output.h"
1718
#include"pglogical_proto.h"
1819
#include"pglogical_hooks.h"
20+
#include"pglogical_relmetacache.h"
1921

2022
#include"access/hash.h"
2123
#include"access/sysattr.h"
@@ -33,7 +35,9 @@
3335

3436
#include"replication/output_plugin.h"
3537
#include"replication/logical.h"
38+
#ifdefHAVE_REPLICATION_ORIGINS
3639
#include"replication/origin.h"
40+
#endif
3741

3842
#include"utils/builtins.h"
3943
#include"utils/catcache.h"
@@ -47,6 +51,8 @@
4751
#include"utils/syscache.h"
4852
#include"utils/typcache.h"
4953

54+
PG_MODULE_MAGIC;
55+
5056
externvoid_PG_output_plugin_init(OutputPluginCallbacks*cb);
5157

5258
/* These must be available to pg_dlsym() */
@@ -61,8 +67,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
6167
ReorderBufferTXN*txn,Relationrel,
6268
ReorderBufferChange*change);
6369

70+
#ifdefHAVE_REPLICATION_ORIGINS
6471
staticboolpg_decode_origin_filter(LogicalDecodingContext*ctx,
6572
RepOriginIdorigin_id);
73+
#endif
6674

6775
staticvoidsend_startup_message(LogicalDecodingContext*ctx,
6876
PGLogicalOutputData*data,boollast_message);
@@ -79,7 +87,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7987
cb->begin_cb=pg_decode_begin_txn;
8088
cb->change_cb=pg_decode_change;
8189
cb->commit_cb=pg_decode_commit_txn;
90+
#ifdefHAVE_REPLICATION_ORIGINS
8291
cb->filter_by_origin_cb=pg_decode_origin_filter;
92+
#endif
8393
cb->shutdown_cb=pg_decode_shutdown;
8494
}
8595

@@ -99,42 +109,42 @@ check_binary_compatibility(PGLogicalOutputData *data)
99109
if (data->client_binary_sizeofdatum!=0
100110
&&data->client_binary_sizeofdatum!=sizeof(Datum))
101111
{
102-
elog(DEBUG1,"Binary mode rejected: Server and clientendiansizeof(Datum) mismatch");
112+
elog(DEBUG1,"Binary mode rejected: Server and client sizeof(Datum) mismatch");
103113
return false;
104114
}
105115

106116
if (data->client_binary_sizeofint!=0
107117
&&data->client_binary_sizeofint!=sizeof(int))
108118
{
109-
elog(DEBUG1,"Binary mode rejected: Server and clientendiansizeof(int) mismatch");
119+
elog(DEBUG1,"Binary mode rejected: Server and client sizeof(int) mismatch");
110120
return false;
111121
}
112122

113123
if (data->client_binary_sizeoflong!=0
114124
&&data->client_binary_sizeoflong!=sizeof(long))
115125
{
116-
elog(DEBUG1,"Binary mode rejected: Server and clientendiansizeof(long) mismatch");
126+
elog(DEBUG1,"Binary mode rejected: Server and client sizeof(long) mismatch");
117127
return false;
118128
}
119129

120130
if (data->client_binary_float4byval_set
121131
&&data->client_binary_float4byval!=server_float4_byval())
122132
{
123-
elog(DEBUG1,"Binary mode rejected: Server and clientendianfloat4byval mismatch");
133+
elog(DEBUG1,"Binary mode rejected: Server and client float4byval mismatch");
124134
return false;
125135
}
126136

127137
if (data->client_binary_float8byval_set
128138
&&data->client_binary_float8byval!=server_float8_byval())
129139
{
130-
elog(DEBUG1,"Binary mode rejected: Server and clientendianfloat8byval mismatch");
140+
elog(DEBUG1,"Binary mode rejected: Server and client float8byval mismatch");
131141
return false;
132142
}
133143

134144
if (data->client_binary_intdatetimes_set
135145
&&data->client_binary_intdatetimes!=server_integer_datetimes())
136146
{
137-
elog(DEBUG1,"Binary mode rejected: Server and clientendianinteger datetimes mismatch");
147+
elog(DEBUG1,"Binary mode rejected: Server and client integer datetimes mismatch");
138148
return false;
139149
}
140150

@@ -148,7 +158,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
148158
{
149159
PGLogicalOutputData*data=palloc0(sizeof(PGLogicalOutputData));
150160

151-
data->context=AllocSetContextCreate(TopMemoryContext,
161+
data->context=AllocSetContextCreate(ctx->context,
152162
"pglogical conversion context",
153163
ALLOCSET_DEFAULT_MINSIZE,
154164
ALLOCSET_DEFAULT_INITSIZE,
@@ -202,17 +212,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
202212
errmsg("client sent startup parameters in format %d but we only support format 1",
203213
params_format)));
204214

205-
if (data->client_min_proto_version>PG_LOGICAL_PROTO_VERSION_NUM)
215+
if (data->client_min_proto_version>PGLOGICAL_PROTO_VERSION_NUM)
206216
ereport(ERROR,
207217
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
208218
errmsg("client sent min_proto_version=%d but we only support protocol %d or lower",
209-
data->client_min_proto_version,PG_LOGICAL_PROTO_VERSION_NUM)));
219+
data->client_min_proto_version,PGLOGICAL_PROTO_VERSION_NUM)));
210220

211-
if (data->client_max_proto_version<PG_LOGICAL_PROTO_MIN_VERSION_NUM)
221+
if (data->client_max_proto_version<PGLOGICAL_PROTO_MIN_VERSION_NUM)
212222
ereport(ERROR,
213223
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
214224
errmsg("client sent max_proto_version=%d but we only support protocol %d or higher",
215-
data->client_max_proto_version,PG_LOGICAL_PROTO_MIN_VERSION_NUM)));
225+
data->client_max_proto_version,PGLOGICAL_PROTO_MIN_VERSION_NUM)));
216226

217227
/*
218228
* Set correct protocol format.
@@ -308,42 +318,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
308318
}
309319

310320
/*
311-
* Will we forward changesets? We have to if we're on 9.4;
312-
* otherwise honour the client's request.
321+
* 9.4 lacks origins info so don't forward it.
322+
*
323+
* There's currently no knob for clients to use to suppress
324+
* this info and it's sent if it's supported and available.
313325
*/
314326
if (PG_VERSION_NUM/100==904)
315-
{
316-
/*
317-
* 9.4 unconditionally forwards changesets due to lack of
318-
* replication origins, and it can't ever send origin info
319-
* for the same reason.
320-
*/
321-
data->forward_changesets= true;
322327
data->forward_changeset_origins= false;
323-
324-
if (data->client_forward_changesets_set
325-
&& !data->client_forward_changesets)
326-
{
327-
ereport(DEBUG1,
328-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
329-
errmsg("Cannot disable changeset forwarding on PostgreSQL 9.4")));
330-
}
331-
}
332-
elseif (data->client_forward_changesets_set
333-
&&data->client_forward_changesets)
334-
{
335-
/* Client explicitly asked for forwarding; forward csets and origins */
336-
data->forward_changesets= true;
337-
data->forward_changeset_origins= true;
338-
}
339328
else
340-
{
341-
/* Default to not forwarding or honour client's request not to fwd */
342-
data->forward_changesets= false;
343-
data->forward_changeset_origins= false;
344-
}
329+
data->forward_changeset_origins= true;
345330

346-
if (data->hooks_setup_funcname!=NIL||data->api->setup_hooks)
331+
if (data->hooks_setup_funcname!=NIL)
347332
{
348333

349334
data->hooks_mctxt=AllocSetContextCreate(ctx->context,
@@ -355,6 +340,43 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
355340
load_hooks(data);
356341
call_startup_hook(data,ctx->output_plugin_options);
357342
}
343+
344+
if (data->client_relmeta_cache_size<-1)
345+
{
346+
ereport(ERROR,
347+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
348+
errmsg("relmeta_cache_size must be -1, 0, or positive")));
349+
}
350+
351+
/*
352+
* Relation metadata cache configuration.
353+
*
354+
* TODO: support fixed size cache
355+
*
356+
* Need a LRU for eviction, and need to implement a new message type for
357+
* cache purge notifications for clients. In the mean time force it to 0
358+
* (off). The client will be told via a startup param and must respect
359+
* that.
360+
*/
361+
if (data->client_relmeta_cache_size!=0
362+
&&data->client_relmeta_cache_size!=-1)
363+
{
364+
ereport(INFO,
365+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
366+
errmsg("fixed size cache not supported, forced to off"),
367+
errdetail("only relmeta_cache_size=0 (off) or relmeta_cache_size=-1 (unlimited) supported")));
368+
369+
data->relmeta_cache_size=0;
370+
}
371+
else
372+
{
373+
/* ack client request */
374+
data->relmeta_cache_size=data->client_relmeta_cache_size;
375+
}
376+
377+
/* if cache enabled, init it */
378+
if (data->relmeta_cache_size!=0)
379+
pglogical_init_relmetacache(ctx->context);
358380
}
359381
}
360382

@@ -370,12 +392,15 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
370392
if (!startup_message_sent)
371393
send_startup_message(ctx,data, false/* can't be last message */);
372394

395+
#ifdefHAVE_REPLICATION_ORIGINS
373396
/* If the record didn't originate locally, send origin info */
374397
send_replication_origin &=txn->origin_id!=InvalidRepOriginId;
398+
#endif
375399

376400
OutputPluginPrepareWrite(ctx, !send_replication_origin);
377401
data->api->write_begin(ctx->out,data,txn);
378402

403+
#ifdefHAVE_REPLICATION_ORIGINS
379404
if (send_replication_origin)
380405
{
381406
char*origin;
@@ -397,6 +422,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
397422
replorigin_by_oid(txn->origin_id, true,&origin))
398423
data->api->write_origin(ctx->out,origin,txn->origin_lsn);
399424
}
425+
#endif
400426

401427
OutputPluginWrite(ctx, true);
402428
}
@@ -421,6 +447,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
421447
{
422448
PGLogicalOutputData*data=ctx->output_plugin_private;
423449
MemoryContextold;
450+
structPGLRelMetaCacheEntry*cached_relmeta=NULL;
451+
424452

425453
/* First check the table filter */
426454
if (!call_row_filter_hook(data,txn,relation,change))
@@ -429,11 +457,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
429457
/* Avoid leaking memory by using and resetting our own context */
430458
old=MemoryContextSwitchTo(data->context);
431459

432-
/* TODO: add caching (send only if changed) */
433-
if (data->api->write_rel)
460+
/*
461+
* If the protocol wants to write relation information and the client
462+
* isn't known to have metadata cached for this relation already,
463+
* send relation metadata.
464+
*
465+
* TODO: track hit/miss stats
466+
*/
467+
if (data->api->write_rel!=NULL&&
468+
!pglogical_cache_relmeta(data,relation,&cached_relmeta))
434469
{
435470
OutputPluginPrepareWrite(ctx, false);
436-
data->api->write_rel(ctx->out,data,relation);
471+
data->api->write_rel(ctx->out,data,relation,cached_relmeta);
437472
OutputPluginWrite(ctx, false);
438473
}
439474

@@ -477,28 +512,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
477512
MemoryContextReset(data->context);
478513
}
479514

515+
#ifdefHAVE_REPLICATION_ORIGINS
480516
/*
481517
* Decide if the whole transaction with specific origin should be filtered out.
482518
*/
483-
externintMtmReplicationNodeId;
484-
485519
staticbool
486520
pg_decode_origin_filter(LogicalDecodingContext*ctx,
487521
RepOriginIdorigin_id)
488522
{
489523
PGLogicalOutputData*data=ctx->output_plugin_private;
490524

491-
if (!call_txn_filter_hook(data,origin_id)) {
492-
return true;
493-
}
494-
495-
if (!data->forward_changesets&&origin_id!=InvalidRepOriginId) {
496-
*(int*)0=0;
525+
if (!call_txn_filter_hook(data,origin_id))
497526
return true;
498-
}
499527

500528
return false;
501529
}
530+
#endif
502531

503532
staticvoid
504533
send_startup_message(LogicalDecodingContext*ctx,
@@ -532,9 +561,10 @@ static void pg_decode_shutdown(LogicalDecodingContext * ctx)
532561

533562
call_shutdown_hook(data);
534563

535-
if (data->hooks_mctxt!=NULL)
536-
{
537-
MemoryContextDelete(data->hooks_mctxt);
538-
data->hooks_mctxt=NULL;
539-
}
564+
pglogical_destroy_relmetacache();
565+
566+
/*
567+
* no need to delete data->context or data->hooks_mctxt as they're children
568+
* of ctx->context which will expire on return.
569+
*/
540570
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp