Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit17b9e7f

Browse files
committed
Support adding partitioned tables to publication
When a partitioned table is added to a publication, changes of all ofits partitions (current or future) are published via that publication.This change only affects which tables a publication considers as itsmembers. The receiving side still sees the data coming from theindividual leaf partitions. So existing restrictions that partitionhierarchies can only be replicated one-to-one are not changed by this.Author: Amit Langote <amitlangote09@gmail.com>Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>Discussion:https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
1 parent61d7c7b commit17b9e7f

File tree

11 files changed

+382
-44
lines changed

11 files changed

+382
-44
lines changed

‎doc/src/sgml/logical-replication.sgml

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -402,13 +402,16 @@
402402

403403
<listitem>
404404
<para>
405-
Replication is only possible from base tables to base tables. That is,
406-
the tables on the publication and on the subscription side must be normal
407-
tables, not views, materialized views, partition root tables, or foreign
408-
tables. In the case of partitions, you can therefore replicate a
409-
partition hierarchy one-to-one, but you cannot currently replicate to a
410-
differently partitioned setup. Attempts to replicate tables other than
411-
base tables will result in an error.
405+
Replication is only supported by tables, partitioned or not, although a
406+
given table must either be partitioned on both servers or not partitioned
407+
at all. Also, when replicating between partitioned tables, the actual
408+
replication occurs between leaf partitions, so partitions on the two
409+
servers must match one-to-one.
410+
</para>
411+
412+
<para>
413+
Attempts to replicate other types of relations such as views, materialized
414+
views, or foreign tables, will result in an error.
412415
</para>
413416
</listitem>
414417
</itemizedlist>

‎doc/src/sgml/ref/create_publication.sgml

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,23 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
6969
specified, the table and all its descendant tables (if any) are added.
7070
Optionally, <literal>*</literal> can be specified after the table name to
7171
explicitly indicate that descendant tables are included.
72+
This does not apply to a partitioned table, however. The partitions of
73+
a partitioned table are always implicitly considered part of the
74+
publication, so they are never explicitly added to the publication.
7275
</para>
7376

7477
<para>
75-
Only persistent base tables can be part of a publication. Temporary
76-
tables, unlogged tables, foreign tables, materialized views, regular
77-
views, and partitioned tables cannot be part of a publication. To
78-
replicate a partitioned table, add the individual partitions to the
79-
publication.
78+
Only persistent base tables and partitioned tables can be part of a
79+
publication. Temporary tables, unlogged tables, foreign tables,
80+
materialized views, and regular views cannot be part of a publication.
81+
</para>
82+
83+
<para>
84+
When a partitioned table is added to a publication, all of its existing
85+
and future partitions are implicitly considered to be part of the
86+
publication. So, even operations that are performed directly on a
87+
partition are also published via publications that its ancestors are
88+
part of.
8089
</para>
8190
</listitem>
8291
</varlistentry>

‎src/backend/catalog/pg_publication.c

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
#include"catalog/index.h"
2525
#include"catalog/indexing.h"
2626
#include"catalog/namespace.h"
27+
#include"catalog/partition.h"
2728
#include"catalog/objectaccess.h"
2829
#include"catalog/objectaddress.h"
30+
#include"catalog/pg_inherits.h"
2931
#include"catalog/pg_publication.h"
3032
#include"catalog/pg_publication_rel.h"
3133
#include"catalog/pg_type.h"
@@ -40,24 +42,18 @@
4042
#include"utils/rel.h"
4143
#include"utils/syscache.h"
4244

45+
staticList*get_rel_publications(Oidrelid);
46+
4347
/*
4448
* Check if relation can be in given publication and throws appropriate
4549
* error if not.
4650
*/
4751
staticvoid
4852
check_publication_add_relation(Relationtargetrel)
4953
{
50-
/* Give more specific error for partitioned tables */
51-
if (RelationGetForm(targetrel)->relkind==RELKIND_PARTITIONED_TABLE)
52-
ereport(ERROR,
53-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
54-
errmsg("\"%s\" is a partitioned table",
55-
RelationGetRelationName(targetrel)),
56-
errdetail("Adding partitioned tables to publications is not supported."),
57-
errhint("You can add the table partitions individually.")));
58-
59-
/* Must be table */
60-
if (RelationGetForm(targetrel)->relkind!=RELKIND_RELATION)
54+
/* Must be a regular or partitioned table */
55+
if (RelationGetForm(targetrel)->relkind!=RELKIND_RELATION&&
56+
RelationGetForm(targetrel)->relkind!=RELKIND_PARTITIONED_TABLE)
6157
ereport(ERROR,
6258
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6359
errmsg("\"%s\" is not a table",
@@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel)
10399
staticbool
104100
is_publishable_class(Oidrelid,Form_pg_classreltuple)
105101
{
106-
returnreltuple->relkind==RELKIND_RELATION&&
102+
return (reltuple->relkind==RELKIND_RELATION||
103+
reltuple->relkind==RELKIND_PARTITIONED_TABLE)&&
107104
!IsCatalogRelationOid(relid)&&
108105
reltuple->relpersistence==RELPERSISTENCE_PERMANENT&&
109106
relid >=FirstNormalObjectId;
@@ -221,10 +218,35 @@ publication_add_relation(Oid pubid, Relation targetrel,
221218

222219

223220
/*
224-
* Gets list of publication oids for a relation oid.
221+
* Gets list of publication oids for a relation, plus those of ancestors,
222+
* if any, if the relation is a partition.
225223
*/
226224
List*
227225
GetRelationPublications(Oidrelid)
226+
{
227+
List*result=NIL;
228+
229+
result=get_rel_publications(relid);
230+
if (get_rel_relispartition(relid))
231+
{
232+
List*ancestors=get_partition_ancestors(relid);
233+
ListCell*lc;
234+
235+
foreach(lc,ancestors)
236+
{
237+
Oidancestor=lfirst_oid(lc);
238+
List*ancestor_pubs=get_rel_publications(ancestor);
239+
240+
result=list_concat(result,ancestor_pubs);
241+
}
242+
}
243+
244+
returnresult;
245+
}
246+
247+
/* Workhorse of GetRelationPublications() */
248+
staticList*
249+
get_rel_publications(Oidrelid)
228250
{
229251
List*result=NIL;
230252
CatCList*pubrellist;
@@ -253,7 +275,7 @@ GetRelationPublications(Oid relid)
253275
* should use GetAllTablesPublicationRelations().
254276
*/
255277
List*
256-
GetPublicationRelations(Oidpubid)
278+
GetPublicationRelations(Oidpubid,PublicationPartOptpub_partopt)
257279
{
258280
List*result;
259281
Relationpubrelsrel;
@@ -279,7 +301,31 @@ GetPublicationRelations(Oid pubid)
279301

280302
pubrel= (Form_pg_publication_rel)GETSTRUCT(tup);
281303

282-
result=lappend_oid(result,pubrel->prrelid);
304+
if (get_rel_relkind(pubrel->prrelid)==RELKIND_PARTITIONED_TABLE&&
305+
pub_partopt!=PUBLICATION_PART_ROOT)
306+
{
307+
List*all_parts=find_all_inheritors(pubrel->prrelid,NoLock,
308+
NULL);
309+
310+
if (pub_partopt==PUBLICATION_PART_ALL)
311+
result=list_concat(result,all_parts);
312+
elseif (pub_partopt==PUBLICATION_PART_LEAF)
313+
{
314+
ListCell*lc;
315+
316+
foreach(lc,all_parts)
317+
{
318+
OidpartOid=lfirst_oid(lc);
319+
320+
if (get_rel_relkind(partOid)!=RELKIND_PARTITIONED_TABLE)
321+
result=lappend_oid(result,partOid);
322+
}
323+
}
324+
else
325+
Assert(false);
326+
}
327+
else
328+
result=lappend_oid(result,pubrel->prrelid);
283329
}
284330

285331
systable_endscan(scan);
@@ -480,10 +526,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
480526
oldcontext=MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
481527

482528
publication=GetPublicationByName(pubname, false);
529+
530+
/*
531+
* Publications support partitioned tables, although all changes are
532+
* replicated using leaf partition identity and schema, so we only
533+
* need those.
534+
*/
483535
if (publication->alltables)
484536
tables=GetAllTablesPublicationRelations();
485537
else
486-
tables=GetPublicationRelations(publication->oid);
538+
tables=GetPublicationRelations(publication->oid,
539+
PUBLICATION_PART_LEAF);
487540
funcctx->user_fctx= (void*)tables;
488541

489542
MemoryContextSwitchTo(oldcontext);

‎src/backend/commands/publicationcmds.c

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
299299
}
300300
else
301301
{
302-
List*relids=GetPublicationRelations(pubform->oid);
302+
/*
303+
* For any partitioned tables contained in the publication, we must
304+
* invalidate all partitions contained in the respective partition
305+
* trees, not just those explicitly mentioned in the publication.
306+
*/
307+
List*relids=GetPublicationRelations(pubform->oid,
308+
PUBLICATION_PART_ALL);
303309

304310
/*
305311
* We don't want to send too many individual messages, at some point
@@ -356,7 +362,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
356362
PublicationDropTables(pubid,rels, false);
357363
else/* DEFELEM_SET */
358364
{
359-
List*oldrelids=GetPublicationRelations(pubid);
365+
List*oldrelids=GetPublicationRelations(pubid,
366+
PUBLICATION_PART_ROOT);
360367
List*delrels=NIL;
361368
ListCell*oldlc;
362369

@@ -498,7 +505,8 @@ RemovePublicationRelById(Oid proid)
498505

499506
/*
500507
* Open relations specified by a RangeVar list.
501-
* The returned tables are locked in ShareUpdateExclusiveLock mode.
508+
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
509+
* add them to a publication.
502510
*/
503511
staticList*
504512
OpenTableList(List*tables)
@@ -539,8 +547,13 @@ OpenTableList(List *tables)
539547
rels=lappend(rels,rel);
540548
relids=lappend_oid(relids,myrelid);
541549

542-
/* Add children of this rel, if requested */
543-
if (recurse)
550+
/*
551+
* Add children of this rel, if requested, so that they too are added
552+
* to the publication. A partitioned table can't have any inheritance
553+
* children other than its partitions, which need not be explicitly
554+
* added to the publication.
555+
*/
556+
if (recurse&&rel->rd_rel->relkind!=RELKIND_PARTITIONED_TABLE)
544557
{
545558
List*children;
546559
ListCell*child;

‎src/backend/replication/logical/tablesync.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,7 @@ copy_table(Relation rel)
761761
/* Map the publisher relation to local one. */
762762
relmapentry=logicalrep_rel_open(lrel.remoteid,NoLock);
763763
Assert(rel==relmapentry->localrel);
764+
Assert(relmapentry->localrel->rd_rel->relkind==RELKIND_RELATION);
764765

765766
/* Start copy on the publisher. */
766767
initStringInfo(&cmd);

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames);
5050
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
5151
uint32hashvalue);
5252

53-
/* Entry in the map used to remember which relation schemas we sent. */
53+
/*
54+
* Entry in the map used to remember which relation schemas we sent.
55+
*
56+
* For partitions, 'pubactions' considers not only the table's own
57+
* publications, but also those of all of its ancestors.
58+
*/
5459
typedefstructRelationSyncEntry
5560
{
5661
Oidrelid;/* relation oid */
@@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
406411
if (!relentry->pubactions.pubtruncate)
407412
continue;
408413

414+
/*
415+
* Don't send partitioned tables, because partitions should be sent
416+
* instead.
417+
*/
418+
if (relation->rd_rel->relkind==RELKIND_PARTITIONED_TABLE)
419+
continue;
420+
409421
relids[nrelids++]=relid;
410422
maybe_send_schema(ctx,relation,relentry);
411423
}
@@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx)
524536

525537
/*
526538
* Find or create entry in the relation schema cache.
539+
*
540+
* This looks up publications that the given relation is directly or
541+
* indirectly part of (the latter if it's really the relation's ancestor that
542+
* is part of a publication) and fills up the found entry with the information
543+
* about which operations to publish.
527544
*/
528545
staticRelationSyncEntry*
529546
get_rel_sync_entry(PGOutputData*data,Oidrelid)

‎src/bin/pg_dump/pg_dump.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3981,8 +3981,12 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
39813981
{
39823982
TableInfo *tbinfo = &tblinfo[i];
39833983

3984-
/* Only plain tables can be aded to publications. */
3985-
if (tbinfo->relkind != RELKIND_RELATION)
3984+
/*
3985+
* Only regular and partitioned tables can be added to
3986+
* publications.
3987+
*/
3988+
if (tbinfo->relkind != RELKIND_RELATION &&
3989+
tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
39863990
continue;
39873991

39883992
/*

‎src/include/catalog/pg_publication.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,24 @@ typedef struct Publication
8080
externPublication*GetPublication(Oidpubid);
8181
externPublication*GetPublicationByName(constchar*pubname,boolmissing_ok);
8282
externList*GetRelationPublications(Oidrelid);
83-
externList*GetPublicationRelations(Oidpubid);
83+
84+
/*---------
85+
* Expected values for pub_partopt parameter of GetRelationPublications(),
86+
* which allows callers to specify which partitions of partitioned tables
87+
* mentioned in the publication they expect to see.
88+
*
89+
*ROOT:only the table explicitly mentioned in the publication
90+
*LEAF:only leaf partitions in given tree
91+
*ALL:all partitions in given tree
92+
*/
93+
typedefenumPublicationPartOpt
94+
{
95+
PUBLICATION_PART_ROOT,
96+
PUBLICATION_PART_LEAF,
97+
PUBLICATION_PART_ALL,
98+
}PublicationPartOpt;
99+
100+
externList*GetPublicationRelations(Oidpubid,PublicationPartOptpub_partopt);
84101
externList*GetAllTablesPublications(void);
85102
externList*GetAllTablesPublicationRelations(void);
86103

‎src/test/regress/expected/publication.out

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,35 @@ Tables:
116116

117117
DROP TABLE testpub_tbl3, testpub_tbl3a;
118118
DROP PUBLICATION testpub3, testpub4;
119+
-- Tests for partitioned tables
120+
SET client_min_messages = 'ERROR';
121+
CREATE PUBLICATION testpub_forparted;
122+
CREATE PUBLICATION testpub_forparted1;
123+
RESET client_min_messages;
124+
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
125+
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
126+
-- works despite missing REPLICA IDENTITY, because updates are not replicated
127+
UPDATE testpub_parted1 SET a = 1;
128+
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
129+
-- only parent is listed as being in publication, not the partition
130+
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
131+
\dRp+ testpub_forparted
132+
Publication testpub_forparted
133+
Owner | All tables | Inserts | Updates | Deletes | Truncates
134+
--------------------------+------------+---------+---------+---------+-----------
135+
regress_publication_user | f | t | t | t | t
136+
Tables:
137+
"public.testpub_parted"
138+
139+
-- should now fail, because parent's publication replicates updates
140+
UPDATE testpub_parted1 SET a = 1;
141+
ERROR: cannot update table "testpub_parted1" because it does not have a replica identity and publishes updates
142+
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
143+
ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
144+
-- works again, because parent's publication is no longer considered
145+
UPDATE testpub_parted1 SET a = 1;
146+
DROP TABLE testpub_parted1;
147+
DROP PUBLICATION testpub_forparted, testpub_forparted1;
119148
-- fail - view
120149
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
121150
ERROR: "testpub_view" is not a table
@@ -142,11 +171,6 @@ Tables:
142171
ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
143172
ERROR: "testpub_view" is not a table
144173
DETAIL: Only tables can be added to publications.
145-
-- fail - partitioned table
146-
ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
147-
ERROR: "testpub_parted" is a partitioned table
148-
DETAIL: Adding partitioned tables to publications is not supported.
149-
HINT: You can add the table partitions individually.
150174
ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
151175
ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
152176
ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp