Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit325f2ec

Browse files
committed
Handle heap rewrites even better in logical decoding
Logical decoding should not publish anything about tables created aspart of a heap rewrite during DDL. Those tables don't exist externally,so consumers of logical decoding cannot do anything sensible with thatinformation. Inab28fea, we workedaround this for built-in logical replication, but that was hack.This is a more proper fix: We mark such transient heaps using the newfield pg_class.relwrite, linking to the original relation OID. Bydefault, we ignore them in logical decoding before they get to theoutput plugin. Optionally, a plugin can register their interest ingetting such changes, if they handle DDL specially, in which case thenew field will help them get information about the actual table.Reviewed-by: Craig Ringer <craig@2ndquadrant.com>
1 parentbe8a7a6 commit325f2ec

File tree

20 files changed

+113
-103
lines changed

20 files changed

+113
-103
lines changed

‎contrib/test_decoding/expected/concurrent_ddl_dml.out

Lines changed: 28 additions & 54 deletions
Large diffs are not rendered by default.

‎contrib/test_decoding/expected/ddl.out

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
117117
(22 rows)
118118

119119
ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
120-
--throw awaychanges, they contain oids
120+
--check that this doesn't produce anychanges from the heap rewrite
121121
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
122122
count
123123
-------
124-
12
124+
0
125125
(1 row)
126126

127127
INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
@@ -192,16 +192,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
192192
COMMIT
193193
(33 rows)
194194

195-
-- hide changes bc of oid visible in full table rewrites
196195
CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
197196
INSERT INTO tr_unique(data) VALUES(10);
198197
ALTER TABLE tr_unique RENAME TO tr_pkey;
199198
ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
200-
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
201-
count
202-
-------
203-
6
204-
(1 row)
199+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
200+
data
201+
-----------------------------------------------------------------------------
202+
BEGIN
203+
table public.tr_unique: INSERT: id2[integer]:1 data[integer]:10
204+
COMMIT
205+
BEGIN
206+
table public.tr_pkey: INSERT: id2[integer]:1 data[integer]:10 id[integer]:1
207+
COMMIT
208+
(6 rows)
205209

206210
INSERT INTO tr_pkey(data) VALUES(1);
207211
--show deletion with primary key

‎contrib/test_decoding/specs/concurrent_ddl_dml.spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE characte
5353
step"s2_alter_tbl2_3rd_text" {ALTERTABLEtbl2ALTERCOLUMNval3TYPEtext; }
5454
step"s2_alter_tbl2_3rd_int" {ALTERTABLEtbl2ALTERCOLUMNval3TYPEintUSINGval3::integer; }
5555

56-
step"s2_get_changes" {SELECTregexp_replace(data,'temp_\d+','temp')ASdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1'); }
56+
step"s2_get_changes" {SELECTdataFROMpg_logical_slot_get_changes('isolation_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1'); }
5757

5858

5959

‎contrib/test_decoding/sql/ddl.sql

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
6767
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
6868

6969
ALTERTABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
70-
--throw awaychanges, they contain oids
70+
--check that this doesn't produce anychanges from the heap rewrite
7171
SELECTcount(data)FROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
7272

7373
INSERT INTO replication_example(somedata, somenum)VALUES (5,1);
@@ -93,12 +93,11 @@ COMMIT;
9393
/* display results*/
9494
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
9595

96-
-- hide changes bc of oid visible in full table rewrites
9796
CREATETABLEtr_unique(id2serial uniqueNOT NULL, dataint);
9897
INSERT INTO tr_unique(data)VALUES(10);
9998
ALTERTABLE tr_unique RENAME TO tr_pkey;
10099
ALTERTABLE tr_pkey ADD COLUMN idserialprimary key;
101-
SELECTcount(data)FROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
100+
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1','include-rewrites','1');
102101

103102
INSERT INTO tr_pkey(data)VALUES(1);
104103
--show deletion with primary key

‎contrib/test_decoding/test_decoding.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
101101
ctx->output_plugin_private=data;
102102

103103
opt->output_type=OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
104+
opt->receive_rewrites= false;
104105

105106
foreach(option,ctx->output_plugin_options)
106107
{
@@ -166,6 +167,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
166167
errmsg("could not parse value \"%s\" for parameter \"%s\"",
167168
strVal(elem->arg),elem->defname)));
168169
}
170+
elseif (strcmp(elem->defname,"include-rewrites")==0)
171+
{
172+
173+
if (elem->arg==NULL)
174+
continue;
175+
elseif (!parse_bool(strVal(elem->arg),&opt->receive_rewrites))
176+
ereport(ERROR,
177+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
178+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
179+
strVal(elem->arg),elem->defname)));
180+
}
169181
else
170182
{
171183
ereport(ERROR,
@@ -412,6 +424,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
412424
quote_qualified_identifier(
413425
get_namespace_name(
414426
get_rel_namespace(RelationGetRelid(relation))),
427+
class_form->relrewrite ?
428+
get_rel_name(class_form->relrewrite) :
415429
NameStr(class_form->relname)));
416430
appendStringInfoChar(ctx->out,':');
417431

‎doc/src/sgml/catalogs.sgml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1923,6 +1923,18 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
19231923
<entry>True if table is a partition</entry>
19241924
</row>
19251925

1926+
<row>
1927+
<entry><structfield>relrewrite</structfield></entry>
1928+
<entry><type>oid</type></entry>
1929+
<entry><literal><link linkend="catalog-pg-class"><structname>pg_class</structname></link>.oid</literal></entry>
1930+
<entry>
1931+
For new relations being written during a DDL operation that requires a
1932+
table rewrite, this contains the OID of the original relation;
1933+
otherwise 0. That state is only visible internally; this field should
1934+
never contain anything other than 0 for a user-visible relation.
1935+
</entry>
1936+
</row>
1937+
19261938
<row>
19271939
<entry><structfield>relfrozenxid</structfield></entry>
19281940
<entry><type>xid</type></entry>

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,12 +486,17 @@ typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
486486
typedef struct OutputPluginOptions
487487
{
488488
OutputPluginOutputType output_type;
489+
bool receive_rewrites;
489490
} OutputPluginOptions;
490491
</programlisting>
491492
<literal>output_type</literal> has to either be set to
492493
<literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
493494
or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
494495
<xref linkend="logicaldecoding-output-mode"/>.
496+
If <literal>receive_rewrites</literal> is true, the output plugin will
497+
also be called for changes made by heap rewrites during certain DDL
498+
operations. These are of interest to plugins that handle DDL
499+
replication, but they require special handling.
495500
</para>
496501

497502
<para>

‎src/backend/bootstrap/bootparse.y

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ Boot_CreateStmt:
257257
false,
258258
true,
259259
false,
260+
InvalidOid,
260261
NULL);
261262
elog(DEBUG4,"relation created with OID %u", id);
262263
}

‎src/backend/catalog/heap.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,7 @@ InsertPgClassTuple(Relation pg_class_desc,
806806
values[Anum_pg_class_relispopulated-1]=BoolGetDatum(rd_rel->relispopulated);
807807
values[Anum_pg_class_relreplident-1]=CharGetDatum(rd_rel->relreplident);
808808
values[Anum_pg_class_relispartition-1]=BoolGetDatum(rd_rel->relispartition);
809+
values[Anum_pg_class_relrewrite-1]=ObjectIdGetDatum(rd_rel->relrewrite);
809810
values[Anum_pg_class_relfrozenxid-1]=TransactionIdGetDatum(rd_rel->relfrozenxid);
810811
values[Anum_pg_class_relminmxid-1]=MultiXactIdGetDatum(rd_rel->relminmxid);
811812
if (relacl!= (Datum)0)
@@ -1038,6 +1039,7 @@ heap_create_with_catalog(const char *relname,
10381039
booluse_user_acl,
10391040
boolallow_system_table_mods,
10401041
boolis_internal,
1042+
Oidrelrewrite,
10411043
ObjectAddress*typaddress)
10421044
{
10431045
Relationpg_class_desc;
@@ -1176,6 +1178,8 @@ heap_create_with_catalog(const char *relname,
11761178

11771179
Assert(relid==RelationGetRelid(new_rel_desc));
11781180

1181+
new_rel_desc->rd_rel->relrewrite=relrewrite;
1182+
11791183
/*
11801184
* Decide whether to create an array type over the relation's rowtype. We
11811185
* do not create any array types for system catalogs (ie, those made

‎src/backend/catalog/toasting.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
279279
false,
280280
true,
281281
true,
282+
InvalidOid,
282283
NULL);
283284
Assert(toast_relid!=InvalidOid);
284285

‎src/backend/commands/cluster.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence,
692692
false,
693693
true,
694694
true,
695+
OIDOldHeap,
695696
NULL);
696697
Assert(OIDNewHeap!=InvalidOid);
697698

‎src/backend/commands/tablecmds.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
764764
true,
765765
allowSystemTableMods,
766766
false,
767+
InvalidOid,
767768
typaddress);
768769

769770
/* Store inheritance information for new rel. */

‎src/backend/replication/logical/logical.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,8 @@ CreateInitDecodingContext(char *plugin,
317317
startup_cb_wrapper(ctx,&ctx->options, true);
318318
MemoryContextSwitchTo(old_context);
319319

320+
ctx->reorder->output_rewrites=ctx->options.receive_rewrites;
321+
320322
returnctx;
321323
}
322324

@@ -410,6 +412,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
410412
startup_cb_wrapper(ctx,&ctx->options, false);
411413
MemoryContextSwitchTo(old_context);
412414

415+
ctx->reorder->output_rewrites=ctx->options.receive_rewrites;
416+
413417
ereport(LOG,
414418
(errmsg("starting logical decoding for slot \"%s\"",
415419
NameStr(slot->data.name)),

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
14021402
if (!RelationIsLogicallyLogged(relation))
14031403
gotochange_done;
14041404

1405+
/*
1406+
* Ignore temporary heaps created during DDL unless the
1407+
* plugin has asked for them.
1408+
*/
1409+
if (relation->rd_rel->relrewrite&& !rb->output_rewrites)
1410+
gotochange_done;
1411+
14051412
/*
14061413
* For now ignore sequence changes entirely. Most of the
14071414
* time they don't log changes using records we

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
#include"utils/inval.h"
2323
#include"utils/int8.h"
24-
#include"utils/lsyscache.h"
2524
#include"utils/memutils.h"
2625
#include"utils/syscache.h"
2726
#include"utils/varlena.h"
@@ -511,31 +510,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
511510
{
512511
Publication*pub=lfirst(lc);
513512

514-
/*
515-
* Skip tables that look like they are from a heap rewrite (see
516-
* make_new_heap()). We need to skip them because the subscriber
517-
* won't have a table by that name to receive the data. That
518-
* means we won't ship the new data in, say, an added column with
519-
* a DEFAULT, but if the user applies the same DDL manually on the
520-
* subscriber, then this will work out for them.
521-
*
522-
* We only need to consider the alltables case, because such a
523-
* transient heap won't be an explicit member of a publication.
524-
*/
525-
if (pub->alltables)
526-
{
527-
char*relname=get_rel_name(relid);
528-
unsignedintu;
529-
intn;
530-
531-
if (sscanf(relname,"pg_temp_%u%n",&u,&n)==1&&
532-
relname[n]=='\0')
533-
{
534-
if (get_rel_relkind(u)==RELKIND_RELATION)
535-
break;
536-
}
537-
}
538-
539513
if (pub->alltables||list_member_oid(pubids,pub->oid))
540514
{
541515
entry->pubactions.pubinsert |=pub->pubactions.pubinsert;

‎src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/*yyyymmddN */
56-
#defineCATALOG_VERSION_NO201803141
56+
#defineCATALOG_VERSION_NO201803211
5757

5858
#endif

‎src/include/catalog/heap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ extern Oid heap_create_with_catalog(const char *relname,
7171
booluse_user_acl,
7272
boolallow_system_table_mods,
7373
boolis_internal,
74+
Oidrelrewrite,
7475
ObjectAddress*typaddress);
7576

7677
externvoidheap_create_init_fork(Relationrel);

‎src/include/catalog/pg_class.h

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
7070
boolrelispopulated;/* matview currently holds query results */
7171
charrelreplident;/* see REPLICA_IDENTITY_xxx constants */
7272
boolrelispartition;/* is relation a partition? */
73+
Oidrelrewrite;/* heap for rewrite during DDL, link to original rel */
7374
TransactionIdrelfrozenxid;/* all Xids < this are frozen in this rel */
7475
TransactionIdrelminmxid;/* all multixacts in this rel are >= this.
7576
* this is really a MultiXactId */
@@ -98,7 +99,7 @@ typedef FormData_pg_class *Form_pg_class;
9899
* ----------------
99100
*/
100101

101-
#defineNatts_pg_class32
102+
#defineNatts_pg_class33
102103
#defineAnum_pg_class_relname1
103104
#defineAnum_pg_class_relnamespace2
104105
#defineAnum_pg_class_reltype3
@@ -126,11 +127,12 @@ typedef FormData_pg_class *Form_pg_class;
126127
#defineAnum_pg_class_relispopulated25
127128
#defineAnum_pg_class_relreplident26
128129
#defineAnum_pg_class_relispartition27
129-
#defineAnum_pg_class_relfrozenxid28
130-
#defineAnum_pg_class_relminmxid29
131-
#defineAnum_pg_class_relacl30
132-
#defineAnum_pg_class_reloptions31
133-
#defineAnum_pg_class_relpartbound32
130+
#defineAnum_pg_class_relrewrite28
131+
#defineAnum_pg_class_relfrozenxid29
132+
#defineAnum_pg_class_relminmxid30
133+
#defineAnum_pg_class_relacl31
134+
#defineAnum_pg_class_reloptions32
135+
#defineAnum_pg_class_relpartbound33
134136

135137
/* ----------------
136138
*initial contents of pg_class
@@ -145,13 +147,13 @@ typedef FormData_pg_class *Form_pg_class;
145147
* Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId;
146148
* similarly, "1" in relminmxid stands for FirstMultiXactId
147149
*/
148-
DATA(insertOID=1247 (pg_typePGNSP710PGUID0000000ffpr300tffffftnf31_null__null__null_));
150+
DATA(insertOID=1247 (pg_typePGNSP710PGUID0000000ffpr300tffffftnf031_null__null__null_));
149151
DESCR("");
150-
DATA(insertOID=1249 (pg_attributePGNSP750PGUID0000000ffpr220fffffftnf31_null__null__null_));
152+
DATA(insertOID=1249 (pg_attributePGNSP750PGUID0000000ffpr220fffffftnf031_null__null__null_));
151153
DESCR("");
152-
DATA(insertOID=1255 (pg_procPGNSP810PGUID0000000ffpr280tffffftnf31_null__null__null_));
154+
DATA(insertOID=1255 (pg_procPGNSP810PGUID0000000ffpr280tffffftnf031_null__null__null_));
153155
DESCR("");
154-
DATA(insertOID=1259 (pg_classPGNSP830PGUID0000000ffpr320tffffftnf31_null__null__null_));
156+
DATA(insertOID=1259 (pg_classPGNSP830PGUID0000000ffpr330tffffftnf031_null__null__null_));
155157
DESCR("");
156158

157159

‎src/include/replication/output_plugin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ typedef enum OutputPluginOutputType
2626
typedefstructOutputPluginOptions
2727
{
2828
OutputPluginOutputTypeoutput_type;
29+
boolreceive_rewrites;
2930
}OutputPluginOptions;
3031

3132
/*

‎src/include/replication/reorderbuffer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,11 @@ struct ReorderBuffer
336336
*/
337337
void*private_data;
338338

339+
/*
340+
* Saved output plugin option
341+
*/
342+
booloutput_rewrites;
343+
339344
/*
340345
* Private memory context.
341346
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp