Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit83fd453

Browse files
committed
Allow publishing partition changes via ancestors
To control whether partition changes are replicated using their ownidentity and schema or an ancestor's, add a new parameter that can beset per publication named 'publish_via_partition_root'.This allows replicating a partitioned table into a different partitionstructure on the subscriber.Author: Amit Langote <amitlangote09@gmail.com>Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>Discussion:https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
1 parent1aac32d commit83fd453

File tree

15 files changed

+724
-174
lines changed

15 files changed

+724
-174
lines changed

‎doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5437,6 +5437,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
54375437
<entry>If true, <command>TRUNCATE</command> operations are replicated for
54385438
tables in the publication.</entry>
54395439
</row>
5440+
5441+
<row>
5442+
<entry><structfield>pubviaroot</structfield></entry>
5443+
<entry><type>bool</type></entry>
5444+
<entry></entry>
5445+
<entry>If true, operations on a leaf partition are replicated using the
5446+
identity and schema of its topmost partitioned ancestor mentioned in the
5447+
publication instead of its own.
5448+
</entry>
5449+
</row>
54405450
</tbody>
54415451
</tgroup>
54425452
</table>

‎doc/src/sgml/logical-replication.sgml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,14 @@
411411
<listitem>
412412
<para>
413413
When replicating between partitioned tables, the actual replication
414-
originates from the leaf partitions on the publisher, so partitions on
415-
the publisher must also exist on the subscriber as valid target tables.
416-
(They could either be leaf partitions themselves, or they could be
417-
further subpartitioned, or they could even be independent tables.)
414+
originates, by default, from the leaf partitions on the publisher, so
415+
partitions on the publisher must also exist on the subscriber as valid
416+
target tables. (They could either be leaf partitions themselves, or they
417+
could be further subpartitioned, or they could even be independent
418+
tables.) Publications can also specify that changes are to be replicated
419+
using the identity and schema of the partitioned root table instead of
420+
that of the individual leaf partitions in which the changes actually
421+
originate (see <xref linkend="sql-createpublication"/>).
418422
</para>
419423
</listitem>
420424
</itemizedlist>

‎doc/src/sgml/ref/create_publication.sgml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,26 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
123123
</para>
124124
</listitem>
125125
</varlistentry>
126+
127+
<varlistentry>
128+
<term><literal>publish_via_partition_root</literal> (<type>boolean</type>)</term>
129+
<listitem>
130+
<para>
131+
This parameter determines whether changes in a partitioned table (or
132+
on its partitions) contained in the publication will be published
133+
using the identity and schema of the partitioned table rather than
134+
that of the individual partitions that are actually changed; the
135+
latter is the default. Enablings this allows the changes to be
136+
replicated into a non-partitioned table or a partitioned table
137+
consisting of a different set of partitions.
138+
</para>
139+
140+
<para>
141+
If this is enabled, <literal>TRUNCATE</literal> operations performed
142+
directly on partitions are not replicated.
143+
</para>
144+
</listitem>
145+
</varlistentry>
126146
</variablelist>
127147

128148
</para>

‎src/backend/catalog/pg_publication.c

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
#include"utils/rel.h"
4343
#include"utils/syscache.h"
4444

45-
staticList*get_rel_publications(Oidrelid);
46-
4745
/*
4846
* Check if relation can be in given publication and throws appropriate
4947
* error if not.
@@ -216,37 +214,9 @@ publication_add_relation(Oid pubid, Relation targetrel,
216214
returnmyself;
217215
}
218216

219-
220-
/*
221-
* Gets list of publication oids for a relation, plus those of ancestors,
222-
* if any, if the relation is a partition.
223-
*/
217+
/* Gets list of publication oids for a relation */
224218
List*
225219
GetRelationPublications(Oidrelid)
226-
{
227-
List*result=NIL;
228-
229-
result=get_rel_publications(relid);
230-
if (get_rel_relispartition(relid))
231-
{
232-
List*ancestors=get_partition_ancestors(relid);
233-
ListCell*lc;
234-
235-
foreach(lc,ancestors)
236-
{
237-
Oidancestor=lfirst_oid(lc);
238-
List*ancestor_pubs=get_rel_publications(ancestor);
239-
240-
result=list_concat(result,ancestor_pubs);
241-
}
242-
}
243-
244-
returnresult;
245-
}
246-
247-
/* Workhorse of GetRelationPublications() */
248-
staticList*
249-
get_rel_publications(Oidrelid)
250220
{
251221
List*result=NIL;
252222
CatCList*pubrellist;
@@ -373,9 +343,13 @@ GetAllTablesPublications(void)
373343

374344
/*
375345
* Gets list of all relation published by FOR ALL TABLES publication(s).
346+
*
347+
* If the publication publishes partition changes via their respective root
348+
* partitioned tables, we must exclude partitions in favor of including the
349+
* root partitioned tables.
376350
*/
377351
List*
378-
GetAllTablesPublicationRelations(void)
352+
GetAllTablesPublicationRelations(boolpubviaroot)
379353
{
380354
RelationclassRel;
381355
ScanKeyDatakey[1];
@@ -397,12 +371,35 @@ GetAllTablesPublicationRelations(void)
397371
Form_pg_classrelForm= (Form_pg_class)GETSTRUCT(tuple);
398372
Oidrelid=relForm->oid;
399373

400-
if (is_publishable_class(relid,relForm))
374+
if (is_publishable_class(relid,relForm)&&
375+
!(relForm->relispartition&&pubviaroot))
401376
result=lappend_oid(result,relid);
402377
}
403378

404379
table_endscan(scan);
405-
table_close(classRel,AccessShareLock);
380+
381+
if (pubviaroot)
382+
{
383+
ScanKeyInit(&key[0],
384+
Anum_pg_class_relkind,
385+
BTEqualStrategyNumber,F_CHAREQ,
386+
CharGetDatum(RELKIND_PARTITIONED_TABLE));
387+
388+
scan=table_beginscan_catalog(classRel,1,key);
389+
390+
while ((tuple=heap_getnext(scan,ForwardScanDirection))!=NULL)
391+
{
392+
Form_pg_classrelForm= (Form_pg_class)GETSTRUCT(tuple);
393+
Oidrelid=relForm->oid;
394+
395+
if (is_publishable_class(relid,relForm)&&
396+
!relForm->relispartition)
397+
result=lappend_oid(result,relid);
398+
}
399+
400+
table_endscan(scan);
401+
table_close(classRel,AccessShareLock);
402+
}
406403

407404
returnresult;
408405
}
@@ -433,6 +430,7 @@ GetPublication(Oid pubid)
433430
pub->pubactions.pubupdate=pubform->pubupdate;
434431
pub->pubactions.pubdelete=pubform->pubdelete;
435432
pub->pubactions.pubtruncate=pubform->pubtruncate;
433+
pub->pubviaroot=pubform->pubviaroot;
436434

437435
ReleaseSysCache(tup);
438436

@@ -533,9 +531,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
533531
* need those.
534532
*/
535533
if (publication->alltables)
536-
tables=GetAllTablesPublicationRelations();
534+
tables=GetAllTablesPublicationRelations(publication->pubviaroot);
537535
else
538536
tables=GetPublicationRelations(publication->oid,
537+
publication->pubviaroot ?
538+
PUBLICATION_PART_ROOT :
539539
PUBLICATION_PART_LEAF);
540540
funcctx->user_fctx= (void*)tables;
541541

‎src/backend/commands/publicationcmds.c

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include"catalog/namespace.h"
2424
#include"catalog/objectaccess.h"
2525
#include"catalog/objectaddress.h"
26+
#include"catalog/partition.h"
2627
#include"catalog/pg_inherits.h"
2728
#include"catalog/pg_publication.h"
2829
#include"catalog/pg_publication_rel.h"
@@ -56,20 +57,21 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
5657
staticvoid
5758
parse_publication_options(List*options,
5859
bool*publish_given,
59-
bool*publish_insert,
60-
bool*publish_update,
61-
bool*publish_delete,
62-
bool*publish_truncate)
60+
PublicationActions*pubactions,
61+
bool*publish_via_partition_root_given,
62+
bool*publish_via_partition_root)
6363
{
6464
ListCell*lc;
6565

6666
*publish_given= false;
67+
*publish_via_partition_root_given= false;
6768

68-
/* Defaults are true */
69-
*publish_insert= true;
70-
*publish_update= true;
71-
*publish_delete= true;
72-
*publish_truncate= true;
69+
/* defaults */
70+
pubactions->pubinsert= true;
71+
pubactions->pubupdate= true;
72+
pubactions->pubdelete= true;
73+
pubactions->pubtruncate= true;
74+
*publish_via_partition_root= false;
7375

7476
/* Parse options */
7577
foreach(lc,options)
@@ -91,10 +93,10 @@ parse_publication_options(List *options,
9193
* If publish option was given only the explicitly listed actions
9294
* should be published.
9395
*/
94-
*publish_insert= false;
95-
*publish_update= false;
96-
*publish_delete= false;
97-
*publish_truncate= false;
96+
pubactions->pubinsert= false;
97+
pubactions->pubupdate= false;
98+
pubactions->pubdelete= false;
99+
pubactions->pubtruncate= false;
98100

99101
*publish_given= true;
100102
publish=defGetString(defel);
@@ -110,19 +112,28 @@ parse_publication_options(List *options,
110112
char*publish_opt= (char*)lfirst(lc);
111113

112114
if (strcmp(publish_opt,"insert")==0)
113-
*publish_insert= true;
115+
pubactions->pubinsert= true;
114116
elseif (strcmp(publish_opt,"update")==0)
115-
*publish_update= true;
117+
pubactions->pubupdate= true;
116118
elseif (strcmp(publish_opt,"delete")==0)
117-
*publish_delete= true;
119+
pubactions->pubdelete= true;
118120
elseif (strcmp(publish_opt,"truncate")==0)
119-
*publish_truncate= true;
121+
pubactions->pubtruncate= true;
120122
else
121123
ereport(ERROR,
122124
(errcode(ERRCODE_SYNTAX_ERROR),
123125
errmsg("unrecognized \"publish\" value: \"%s\"",publish_opt)));
124126
}
125127
}
128+
elseif (strcmp(defel->defname,"publish_via_partition_root")==0)
129+
{
130+
if (*publish_via_partition_root_given)
131+
ereport(ERROR,
132+
(errcode(ERRCODE_SYNTAX_ERROR),
133+
errmsg("conflicting or redundant options")));
134+
*publish_via_partition_root_given= true;
135+
*publish_via_partition_root=defGetBoolean(defel);
136+
}
126137
else
127138
ereport(ERROR,
128139
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -143,10 +154,9 @@ CreatePublication(CreatePublicationStmt *stmt)
143154
Datumvalues[Natts_pg_publication];
144155
HeapTupletup;
145156
boolpublish_given;
146-
boolpublish_insert;
147-
boolpublish_update;
148-
boolpublish_delete;
149-
boolpublish_truncate;
157+
PublicationActionspubactions;
158+
boolpublish_via_partition_root_given;
159+
boolpublish_via_partition_root;
150160
AclResultaclresult;
151161

152162
/* must have CREATE privilege on database */
@@ -183,23 +193,25 @@ CreatePublication(CreatePublicationStmt *stmt)
183193
values[Anum_pg_publication_pubowner-1]=ObjectIdGetDatum(GetUserId());
184194

185195
parse_publication_options(stmt->options,
186-
&publish_given,&publish_insert,
187-
&publish_update,&publish_delete,
188-
&publish_truncate);
196+
&publish_given,&pubactions,
197+
&publish_via_partition_root_given,
198+
&publish_via_partition_root);
189199

190200
puboid=GetNewOidWithIndex(rel,PublicationObjectIndexId,
191201
Anum_pg_publication_oid);
192202
values[Anum_pg_publication_oid-1]=ObjectIdGetDatum(puboid);
193203
values[Anum_pg_publication_puballtables-1]=
194204
BoolGetDatum(stmt->for_all_tables);
195205
values[Anum_pg_publication_pubinsert-1]=
196-
BoolGetDatum(publish_insert);
206+
BoolGetDatum(pubactions.pubinsert);
197207
values[Anum_pg_publication_pubupdate-1]=
198-
BoolGetDatum(publish_update);
208+
BoolGetDatum(pubactions.pubupdate);
199209
values[Anum_pg_publication_pubdelete-1]=
200-
BoolGetDatum(publish_delete);
210+
BoolGetDatum(pubactions.pubdelete);
201211
values[Anum_pg_publication_pubtruncate-1]=
202-
BoolGetDatum(publish_truncate);
212+
BoolGetDatum(pubactions.pubtruncate);
213+
values[Anum_pg_publication_pubviaroot-1]=
214+
BoolGetDatum(publish_via_partition_root);
203215

204216
tup=heap_form_tuple(RelationGetDescr(rel),values,nulls);
205217

@@ -251,17 +263,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
251263
boolreplaces[Natts_pg_publication];
252264
Datumvalues[Natts_pg_publication];
253265
boolpublish_given;
254-
boolpublish_insert;
255-
boolpublish_update;
256-
boolpublish_delete;
257-
boolpublish_truncate;
266+
PublicationActionspubactions;
267+
boolpublish_via_partition_root_given;
268+
boolpublish_via_partition_root;
258269
ObjectAddressobj;
259270
Form_pg_publicationpubform;
260271

261272
parse_publication_options(stmt->options,
262-
&publish_given,&publish_insert,
263-
&publish_update,&publish_delete,
264-
&publish_truncate);
273+
&publish_given,&pubactions,
274+
&publish_via_partition_root_given,
275+
&publish_via_partition_root);
265276

266277
/* Everything ok, form a new tuple. */
267278
memset(values,0,sizeof(values));
@@ -270,19 +281,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
270281

271282
if (publish_given)
272283
{
273-
values[Anum_pg_publication_pubinsert-1]=BoolGetDatum(publish_insert);
284+
values[Anum_pg_publication_pubinsert-1]=BoolGetDatum(pubactions.pubinsert);
274285
replaces[Anum_pg_publication_pubinsert-1]= true;
275286

276-
values[Anum_pg_publication_pubupdate-1]=BoolGetDatum(publish_update);
287+
values[Anum_pg_publication_pubupdate-1]=BoolGetDatum(pubactions.pubupdate);
277288
replaces[Anum_pg_publication_pubupdate-1]= true;
278289

279-
values[Anum_pg_publication_pubdelete-1]=BoolGetDatum(publish_delete);
290+
values[Anum_pg_publication_pubdelete-1]=BoolGetDatum(pubactions.pubdelete);
280291
replaces[Anum_pg_publication_pubdelete-1]= true;
281292

282-
values[Anum_pg_publication_pubtruncate-1]=BoolGetDatum(publish_truncate);
293+
values[Anum_pg_publication_pubtruncate-1]=BoolGetDatum(pubactions.pubtruncate);
283294
replaces[Anum_pg_publication_pubtruncate-1]= true;
284295
}
285296

297+
if (publish_via_partition_root_given)
298+
{
299+
values[Anum_pg_publication_pubviaroot-1]=BoolGetDatum(publish_via_partition_root);
300+
replaces[Anum_pg_publication_pubviaroot-1]= true;
301+
}
302+
286303
tup=heap_modify_tuple(tup,RelationGetDescr(rel),values,nulls,
287304
replaces);
288305

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp