Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit923def9

Browse files
committed
Allow specifying column lists for logical replication
This allows specifying an optional column list when adding a table tological replication. The column list may be specified after the tablename, enclosed in parentheses. Columns not included in this list are notsent to the subscriber, allowing the schema on the subscriber to be asubset of the publisher schema.For UPDATE/DELETE publications, the column list needs to cover allREPLICA IDENTITY columns. For INSERT publications, the column list isarbitrary and may omit some REPLICA IDENTITY columns. Furthermore, ifthe table uses REPLICA IDENTITY FULL, column list is not allowed.The column list can contain only simple column references. Complexexpressions, function calls etc. are not allowed. This restriction couldbe relaxed in the future.During the initial table synchronization, only columns included in thecolumn list are copied to the subscriber. If the subscription hasseveral publications, containing the same table with different columnlists, columns specified in any of the lists will be copied.This means all columns are replicated if the table has no column listat all (which is treated as column list with all columns), or when ofthe publications is defined as FOR ALL TABLES (possibly IN SCHEMA thatmatches the schema of the table).For partitioned tables, publish_via_partition_root determines whetherthe column list for the root or the leaf relation will be used. If theparameter is 'false' (the default), the list defined for the leafrelation is used. Otherwise, the column list for the root partitionwill be used.Psql commands \dRp+ and \d <table-name> now display any column lists.Author: Tomas Vondra, Alvaro Herrera, Rahila SyedReviewed-by: Peter Eisentraut, Alvaro Herrera, Vignesh C, Ibrar Ahmed,Amit Kapila, Hou zj, Peter Smith, Wang wei, Tang, Shi yuDiscussion:https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
1 parent05843b1 commit923def9

File tree

26 files changed

+2833
-92
lines changed

26 files changed

+2833
-92
lines changed

‎doc/src/sgml/catalogs.sgml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4410,7 +4410,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
44104410
</para>
44114411
<para>
44124412
This is an array of <structfield>indnatts</structfield> values that
4413-
indicate which table columns this index indexes. For example a value
4413+
indicate which table columns this index indexes. For example, a value
44144414
of <literal>1 3</literal> would mean that the first and the third table
44154415
columns make up the index entries. Key columns come before non-key
44164416
(included) columns. A zero in this array indicates that the
@@ -6291,6 +6291,19 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
62916291
Reference to schema
62926292
</para></entry>
62936293
</row>
6294+
6295+
<row>
6296+
<entry role="catalog_table_entry"><para role="column_definition">
6297+
<structfield>prattrs</structfield> <type>int2vector</type>
6298+
(references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attnum</structfield>)
6299+
</para>
6300+
<para>
6301+
This is an array of values that indicates which table columns are
6302+
part of the publication. For example, a value of <literal>1 3</literal>
6303+
would mean that the first and the third table columns are published.
6304+
A null value indicates that all columns are published.
6305+
</para></entry>
6306+
</row>
62946307
</tbody>
62956308
</tgroup>
62966309
</table>

‎doc/src/sgml/protocol.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7016,7 +7016,8 @@ Relation
70167016
</listitem>
70177017
</varlistentry>
70187018
</variablelist>
7019-
Next, the following message part appears for each column (except generated columns):
7019+
Next, the following message part appears for each column included in
7020+
the publication (except generated columns):
70207021
<variablelist>
70217022
<varlistentry>
70227023
<term>

‎doc/src/sgml/ref/alter_publication.sgml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
3030

3131
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
3232

33-
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
33+
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
3434
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [, ... ]
3535
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
3636
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
@@ -114,6 +114,14 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
114114
specified, the table and all its descendant tables (if any) are
115115
affected. Optionally, <literal>*</literal> can be specified after the table
116116
name to explicitly indicate that descendant tables are included.
117+
</para>
118+
119+
<para>
120+
Optionally, a column list can be specified. See <xref
121+
linkend="sql-createpublication"/> for details.
122+
</para>
123+
124+
<para>
117125
If the optional <literal>WHERE</literal> clause is specified, rows for
118126
which the <replaceable class="parameter">expression</replaceable>
119127
evaluates to false or null will not be published. Note that parentheses
@@ -185,7 +193,13 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete');
185193
<para>
186194
Add some tables to the publication:
187195
<programlisting>
188-
ALTER PUBLICATION mypublication ADD TABLE users, departments;
196+
ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments;
197+
</programlisting></para>
198+
199+
<para>
200+
Change the set of columns published for a table:
201+
<programlisting>
202+
ALTER PUBLICATION mypublication SET TABLE users (user_id, firstname, lastname), TABLE departments;
189203
</programlisting></para>
190204

191205
<para>

‎doc/src/sgml/ref/create_publication.sgml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
3333

3434
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
3535

36-
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
36+
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
3737
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ... ]
3838
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
3939
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
@@ -93,6 +93,13 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
9393
<literal>TRUNCATE</literal> commands.
9494
</para>
9595

96+
<para>
97+
When a column list is specified, only the named columns are replicated.
98+
If no column list is specified, all columns of the table are replicated
99+
through this publication, including any columns added later. If a column
100+
list is specified, it must include the replica identity columns.
101+
</para>
102+
96103
<para>
97104
Only persistent base tables and partitioned tables can be part of a
98105
publication. Temporary tables, unlogged tables, foreign tables,
@@ -348,6 +355,14 @@ CREATE PUBLICATION production_publication FOR TABLE users, departments, ALL TABL
348355
<structname>sales</structname>:
349356
<programlisting>
350357
CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales;
358+
</programlisting></para>
359+
360+
<para>
361+
Create a publication that publishes all changes for table <structname>users</structname>,
362+
but replicates only columns <structname>user_id</structname> and
363+
<structname>firstname</structname>:
364+
<programlisting>
365+
CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname);
351366
</programlisting></para>
352367
</refsect1>
353368

‎src/backend/catalog/pg_publication.c

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
#include"utils/rel.h"
4646
#include"utils/syscache.h"
4747

48+
staticvoidpublication_translate_columns(Relationtargetrel,List*columns,
49+
int*natts,AttrNumber**attrs);
50+
4851
/*
4952
* Check if relation can be in given publication and throws appropriate
5053
* error if not.
@@ -395,6 +398,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
395398
Oidrelid=RelationGetRelid(targetrel);
396399
Oidpubreloid;
397400
Publication*pub=GetPublication(pubid);
401+
AttrNumber*attarray=NULL;
402+
intnatts=0;
398403
ObjectAddressmyself,
399404
referenced;
400405
List*relids=NIL;
@@ -422,6 +427,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
422427

423428
check_publication_add_relation(targetrel);
424429

430+
/*
431+
* Translate column names to attnums and make sure the column list contains
432+
* only allowed elements (no system or generated columns etc.). Also build
433+
* an array of attnums, for storing in the catalog.
434+
*/
435+
publication_translate_columns(pri->relation,pri->columns,
436+
&natts,&attarray);
437+
425438
/* Form a tuple. */
426439
memset(values,0,sizeof(values));
427440
memset(nulls, false,sizeof(nulls));
@@ -440,6 +453,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
440453
else
441454
nulls[Anum_pg_publication_rel_prqual-1]= true;
442455

456+
/* Add column list, if available */
457+
if (pri->columns)
458+
values[Anum_pg_publication_rel_prattrs-1]=PointerGetDatum(buildint2vector(attarray,natts));
459+
else
460+
nulls[Anum_pg_publication_rel_prattrs-1]= true;
461+
443462
tup=heap_form_tuple(RelationGetDescr(rel),values,nulls);
444463

445464
/* Insert tuple into catalog. */
@@ -463,6 +482,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
463482
DEPENDENCY_NORMAL,DEPENDENCY_NORMAL,
464483
false);
465484

485+
/* Add dependency on the columns, if any are listed */
486+
for (inti=0;i<natts;i++)
487+
{
488+
ObjectAddressSubSet(referenced,RelationRelationId,relid,attarray[i]);
489+
recordDependencyOn(&myself,&referenced,DEPENDENCY_NORMAL);
490+
}
491+
466492
/* Close the table. */
467493
table_close(rel,RowExclusiveLock);
468494

@@ -482,6 +508,125 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
482508
returnmyself;
483509
}
484510

511+
/* qsort comparator for attnums */
512+
staticint
513+
compare_int16(constvoid*a,constvoid*b)
514+
{
515+
intav=*(constint16*)a;
516+
intbv=*(constint16*)b;
517+
518+
/* this can't overflow if int is wider than int16 */
519+
return (av-bv);
520+
}
521+
522+
/*
523+
* Translate a list of column names to an array of attribute numbers
524+
* and a Bitmapset with them; verify that each attribute is appropriate
525+
* to have in a publication column list (no system or generated attributes,
526+
* no duplicates). Additional checks with replica identity are done later;
527+
* see check_publication_columns.
528+
*
529+
* Note that the attribute numbers are *not* offset by
530+
* FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
531+
* is okay.
532+
*/
533+
staticvoid
534+
publication_translate_columns(Relationtargetrel,List*columns,
535+
int*natts,AttrNumber**attrs)
536+
{
537+
AttrNumber*attarray=NULL;
538+
Bitmapset*set=NULL;
539+
ListCell*lc;
540+
intn=0;
541+
TupleDesctupdesc=RelationGetDescr(targetrel);
542+
543+
/* Bail out when no column list defined. */
544+
if (!columns)
545+
return;
546+
547+
/*
548+
* Translate list of columns to attnums. We prohibit system attributes and
549+
* make sure there are no duplicate columns.
550+
*/
551+
attarray=palloc(sizeof(AttrNumber)*list_length(columns));
552+
foreach(lc,columns)
553+
{
554+
char*colname=strVal(lfirst(lc));
555+
AttrNumberattnum=get_attnum(RelationGetRelid(targetrel),colname);
556+
557+
if (attnum==InvalidAttrNumber)
558+
ereport(ERROR,
559+
errcode(ERRCODE_UNDEFINED_COLUMN),
560+
errmsg("column \"%s\" of relation \"%s\" does not exist",
561+
colname,RelationGetRelationName(targetrel)));
562+
563+
if (!AttrNumberIsForUserDefinedAttr(attnum))
564+
ereport(ERROR,
565+
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
566+
errmsg("cannot reference system column \"%s\" in publication column list",
567+
colname));
568+
569+
if (TupleDescAttr(tupdesc,attnum-1)->attgenerated)
570+
ereport(ERROR,
571+
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
572+
errmsg("cannot reference generated column \"%s\" in publication column list",
573+
colname));
574+
575+
if (bms_is_member(attnum,set))
576+
ereport(ERROR,
577+
errcode(ERRCODE_DUPLICATE_OBJECT),
578+
errmsg("duplicate column \"%s\" in publication column list",
579+
colname));
580+
581+
set=bms_add_member(set,attnum);
582+
attarray[n++]=attnum;
583+
}
584+
585+
/* Be tidy, so that the catalog representation is always sorted */
586+
qsort(attarray,n,sizeof(AttrNumber),compare_int16);
587+
588+
*natts=n;
589+
*attrs=attarray;
590+
591+
bms_free(set);
592+
}
593+
594+
/*
595+
* Transform the column list (represented by an array) to a bitmapset.
596+
*/
597+
Bitmapset*
598+
pub_collist_to_bitmapset(Bitmapset*columns,Datumpubcols,MemoryContextmcxt)
599+
{
600+
Bitmapset*result=NULL;
601+
ArrayType*arr;
602+
intnelems;
603+
int16*elems;
604+
MemoryContextoldcxt;
605+
606+
/*
607+
* If an existing bitmap was provided, use it. Otherwise just use NULL
608+
* and build a new bitmap.
609+
*/
610+
if (columns)
611+
result=columns;
612+
613+
arr=DatumGetArrayTypeP(pubcols);
614+
nelems=ARR_DIMS(arr)[0];
615+
elems= (int16*)ARR_DATA_PTR(arr);
616+
617+
/* If a memory context was specified, switch to it. */
618+
if (mcxt)
619+
oldcxt=MemoryContextSwitchTo(mcxt);
620+
621+
for (inti=0;i<nelems;i++)
622+
result=bms_add_member(result,elems[i]);
623+
624+
if (mcxt)
625+
MemoryContextSwitchTo(oldcxt);
626+
627+
returnresult;
628+
}
629+
485630
/*
486631
* Insert new publication / schema mapping.
487632
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp