Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit039eb6e

Browse files
committed
Logical replication support for TRUNCATE
Update the built-in logical replication system to make use of thepreviously added logical decoding for TRUNCATE support. Add therequired truncate callback to pgoutput and a new logical replicationprotocol message.Publications get a new attribute to determine whether to replicatetruncate actions. When updating a publication via pg_dump from an olderversion, this is not set, thus preserving the previous behavior.Author: Simon Riggs <simon@2ndquadrant.com>Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>Reviewed-by: Andres Freund <andres@anarazel.de>Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
1 parent5dfd1e5 commit039eb6e

File tree

19 files changed

+572
-111
lines changed

19 files changed

+572
-111
lines changed

‎doc/src/sgml/catalogs.sgml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5518,6 +5518,14 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
55185518
<entry>If true, <command>DELETE</command> operations are replicated for
55195519
tables in the publication.</entry>
55205520
</row>
5521+
5522+
<row>
5523+
<entry><structfield>pubtruncate</structfield></entry>
5524+
<entry><type>bool</type></entry>
5525+
<entry></entry>
5526+
<entry>If true, <command>TRUNCATE</command> operations are replicated for
5527+
tables in the publication.</entry>
5528+
</row>
55215529
</tbody>
55225530
</tgroup>
55235531
</table>

‎doc/src/sgml/logical-replication.sgml

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@
108108

109109
<para>
110110
Publications can choose to limit the changes they produce to
111-
any combination of <command>INSERT</command>, <command>UPDATE</command>, and
112-
<command>DELETE</command>, similar to how triggers are fired by
111+
any combination of <command>INSERT</command>, <command>UPDATE</command>,
112+
<command>DELETE</command>,and <command>TRUNCATE</command>,similar to how triggers are fired by
113113
particular event types. By default, all operation types are replicated.
114114
</para>
115115

@@ -364,15 +364,6 @@
364364
</para>
365365
</listitem>
366366

367-
<listitem>
368-
<para>
369-
<command>TRUNCATE</command> commands are not replicated. This can, of
370-
course, be worked around by using <command>DELETE</command> instead. To
371-
avoid accidental <command>TRUNCATE</command> invocations, you can revoke
372-
the <literal>TRUNCATE</literal> privilege from tables.
373-
</para>
374-
</listitem>
375-
376367
<listitem>
377368
<para>
378369
Large objects (see <xref linkend="largeobjects"/>) are not replicated.

‎doc/src/sgml/protocol.sgml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6774,6 +6774,62 @@ Delete
67746774
</listitem>
67756775
</varlistentry>
67766776

6777+
<varlistentry>
6778+
<term>
6779+
Truncate
6780+
</term>
6781+
<listitem>
6782+
<para>
6783+
6784+
<variablelist>
6785+
<varlistentry>
6786+
<term>
6787+
Byte1('T')
6788+
</term>
6789+
<listitem>
6790+
<para>
6791+
Identifies the message as a truncate message.
6792+
</para>
6793+
</listitem>
6794+
</varlistentry>
6795+
<varlistentry>
6796+
<term>
6797+
Int32
6798+
</term>
6799+
<listitem>
6800+
<para>
6801+
Number of relations
6802+
</para>
6803+
</listitem>
6804+
</varlistentry>
6805+
<varlistentry>
6806+
<term>
6807+
Int8
6808+
</term>
6809+
<listitem>
6810+
<para>
6811+
Option bits for <command>TRUNCATE</command>:
6812+
1 for <literal>CASCADE</literal>, 2 for <literal>RESTART IDENTITY</literal>
6813+
</para>
6814+
</listitem>
6815+
</varlistentry>
6816+
<varlistentry>
6817+
<term>
6818+
Int32
6819+
</term>
6820+
<listitem>
6821+
<para>
6822+
ID of the relation corresponding to the ID in the relation
6823+
message. This field is repeated for each relation.
6824+
</para>
6825+
</listitem>
6826+
</varlistentry>
6827+
6828+
</variablelist>
6829+
</para>
6830+
</listitem>
6831+
</varlistentry>
6832+
67776833
</variablelist>
67786834

67796835
<para>

‎doc/src/sgml/ref/create_publication.sgml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
106106
This parameter determines which DML operations will be published by
107107
the new publication to the subscribers. The value is
108108
comma-separated list of operations. The allowed operations are
109-
<literal>insert</literal>, <literal>update</literal>, and
110-
<literal>delete</literal>. The default is to publish all actions,
109+
<literal>insert</literal>, <literal>update</literal>,
110+
<literal>delete</literal>, and <literal>truncate</literal>.
111+
The default is to publish all actions,
111112
and so the default value for this option is
112-
<literal>'insert, update, delete'</literal>.
113+
<literal>'insert, update, delete, truncate'</literal>.
113114
</para>
114115
</listitem>
115116
</varlistentry>
@@ -168,8 +169,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
168169
</para>
169170

170171
<para>
171-
<command>TRUNCATE</command> and <acronym>DDL</acronym> operations
172-
are not published.
172+
<acronym>DDL</acronym> operations are not published.
173173
</para>
174174
</refsect1>
175175

‎src/backend/catalog/pg_publication.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ GetPublication(Oid pubid)
376376
pub->pubactions.pubinsert=pubform->pubinsert;
377377
pub->pubactions.pubupdate=pubform->pubupdate;
378378
pub->pubactions.pubdelete=pubform->pubdelete;
379+
pub->pubactions.pubtruncate=pubform->pubtruncate;
379380

380381
ReleaseSysCache(tup);
381382

‎src/backend/commands/publicationcmds.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ parse_publication_options(List *options,
6262
bool*publish_given,
6363
bool*publish_insert,
6464
bool*publish_update,
65-
bool*publish_delete)
65+
bool*publish_delete,
66+
bool*publish_truncate)
6667
{
6768
ListCell*lc;
6869

@@ -72,6 +73,7 @@ parse_publication_options(List *options,
7273
*publish_insert= true;
7374
*publish_update= true;
7475
*publish_delete= true;
76+
*publish_truncate= true;
7577

7678
/* Parse options */
7779
foreach(lc,options)
@@ -96,6 +98,7 @@ parse_publication_options(List *options,
9698
*publish_insert= false;
9799
*publish_update= false;
98100
*publish_delete= false;
101+
*publish_truncate= false;
99102

100103
*publish_given= true;
101104
publish=defGetString(defel);
@@ -116,6 +119,8 @@ parse_publication_options(List *options,
116119
*publish_update= true;
117120
elseif (strcmp(publish_opt,"delete")==0)
118121
*publish_delete= true;
122+
elseif (strcmp(publish_opt,"truncate")==0)
123+
*publish_truncate= true;
119124
else
120125
ereport(ERROR,
121126
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt)
145150
boolpublish_insert;
146151
boolpublish_update;
147152
boolpublish_delete;
153+
boolpublish_truncate;
148154
AclResultaclresult;
149155

150156
/* must have CREATE privilege on database */
@@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt)
181187

182188
parse_publication_options(stmt->options,
183189
&publish_given,&publish_insert,
184-
&publish_update,&publish_delete);
190+
&publish_update,&publish_delete,
191+
&publish_truncate);
185192

186193
values[Anum_pg_publication_puballtables-1]=
187194
BoolGetDatum(stmt->for_all_tables);
@@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt)
191198
BoolGetDatum(publish_update);
192199
values[Anum_pg_publication_pubdelete-1]=
193200
BoolGetDatum(publish_delete);
201+
values[Anum_pg_publication_pubtruncate-1]=
202+
BoolGetDatum(publish_truncate);
194203

195204
tup=heap_form_tuple(RelationGetDescr(rel),values,nulls);
196205

@@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
237246
boolpublish_insert;
238247
boolpublish_update;
239248
boolpublish_delete;
249+
boolpublish_truncate;
240250
ObjectAddressobj;
241251

242252
parse_publication_options(stmt->options,
243253
&publish_given,&publish_insert,
244-
&publish_update,&publish_delete);
254+
&publish_update,&publish_delete,
255+
&publish_truncate);
245256

246257
/* Everything ok, form a new tuple. */
247258
memset(values,0,sizeof(values));
@@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
258269

259270
values[Anum_pg_publication_pubdelete-1]=BoolGetDatum(publish_delete);
260271
replaces[Anum_pg_publication_pubdelete-1]= true;
272+
273+
values[Anum_pg_publication_pubtruncate-1]=BoolGetDatum(publish_truncate);
274+
replaces[Anum_pg_publication_pubtruncate-1]= true;
261275
}
262276

263277
tup=heap_modify_tuple(tup,RelationGetDescr(rel),values,nulls,

‎src/backend/replication/logical/proto.c

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
*/
2727
#defineLOGICALREP_IS_REPLICA_IDENTITY 1
2828

29+
#defineTRUNCATE_CASCADE(1<<0)
30+
#defineTRUNCATE_RESTART_SEQS(1<<1)
31+
2932
staticvoidlogicalrep_write_attrs(StringInfoout,Relationrel);
3033
staticvoidlogicalrep_write_tuple(StringInfoout,Relationrel,
3134
HeapTupletuple);
@@ -292,6 +295,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
292295
returnrelid;
293296
}
294297

298+
/*
299+
* Write TRUNCATE to the output stream.
300+
*/
301+
void
302+
logicalrep_write_truncate(StringInfoout,
303+
intnrelids,
304+
Oidrelids[],
305+
boolcascade,boolrestart_seqs)
306+
{
307+
inti;
308+
uint8flags=0;
309+
310+
pq_sendbyte(out,'T');/* action TRUNCATE */
311+
312+
pq_sendint32(out,nrelids);
313+
314+
/* encode and send truncate flags */
315+
if (cascade)
316+
flags |=TRUNCATE_CASCADE;
317+
if (restart_seqs)
318+
flags |=TRUNCATE_RESTART_SEQS;
319+
pq_sendint8(out,flags);
320+
321+
for (i=0;i<nrelids;i++)
322+
pq_sendint32(out,relids[i]);
323+
}
324+
325+
/*
326+
* Read TRUNCATE from stream.
327+
*/
328+
List*
329+
logicalrep_read_truncate(StringInfoin,
330+
bool*cascade,bool*restart_seqs)
331+
{
332+
inti;
333+
intnrelids;
334+
List*relids=NIL;
335+
uint8flags;
336+
337+
nrelids=pq_getmsgint(in,4);
338+
339+
/* read and decode truncate flags */
340+
flags=pq_getmsgint(in,1);
341+
*cascade= (flags&TRUNCATE_CASCADE)>0;
342+
*restart_seqs= (flags&TRUNCATE_RESTART_SEQS)>0;
343+
344+
for (i=0;i<nrelids;i++)
345+
relids=lappend_oid(relids,pq_getmsgint(in,4));
346+
347+
returnrelids;
348+
}
349+
295350
/*
296351
* Write relation description to the output stream.
297352
*/

‎src/backend/replication/logical/worker.c

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
#include"access/xact.h"
3131
#include"access/xlog_internal.h"
3232

33+
#include"catalog/catalog.h"
3334
#include"catalog/namespace.h"
3435
#include"catalog/pg_subscription.h"
3536
#include"catalog/pg_subscription_rel.h"
3637

38+
#include"commands/tablecmds.h"
3739
#include"commands/trigger.h"
3840

3941
#include"executor/executor.h"
@@ -83,6 +85,7 @@
8385
#include"utils/inval.h"
8486
#include"utils/lsyscache.h"
8587
#include"utils/memutils.h"
88+
#include"utils/rel.h"
8689
#include"utils/timeout.h"
8790
#include"utils/tqual.h"
8891
#include"utils/syscache.h"
@@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s)
888891
CommandCounterIncrement();
889892
}
890893

894+
/*
895+
* Handle TRUNCATE message.
896+
*
897+
* TODO: FDW support
898+
*/
899+
staticvoid
900+
apply_handle_truncate(StringInfos)
901+
{
902+
boolcascade= false;
903+
boolrestart_seqs= false;
904+
List*remote_relids=NIL;
905+
List*remote_rels=NIL;
906+
List*rels=NIL;
907+
List*relids=NIL;
908+
List*relids_logged=NIL;
909+
ListCell*lc;
910+
911+
ensure_transaction();
912+
913+
remote_relids=logicalrep_read_truncate(s,&cascade,&restart_seqs);
914+
915+
foreach(lc,remote_relids)
916+
{
917+
LogicalRepRelIdrelid=lfirst_oid(lc);
918+
LogicalRepRelMapEntry*rel;
919+
920+
rel=logicalrep_rel_open(relid,RowExclusiveLock);
921+
if (!should_apply_changes_for_rel(rel))
922+
{
923+
/*
924+
* The relation can't become interesting in the middle of the
925+
* transaction so it's safe to unlock it.
926+
*/
927+
logicalrep_rel_close(rel,RowExclusiveLock);
928+
continue;
929+
}
930+
931+
remote_rels=lappend(remote_rels,rel);
932+
rels=lappend(rels,rel->localrel);
933+
relids=lappend_oid(relids,rel->localreloid);
934+
if (RelationIsLogicallyLogged(rel->localrel))
935+
relids_logged=lappend_oid(relids,rel->localreloid);
936+
}
937+
938+
/*
939+
* Even if we used CASCADE on the upstream master we explicitly
940+
* default to replaying changes without further cascading.
941+
* This might be later changeable with a user specified option.
942+
*/
943+
ExecuteTruncateGuts(rels,relids,relids_logged,DROP_RESTRICT,restart_seqs);
944+
945+
foreach(lc,remote_rels)
946+
{
947+
LogicalRepRelMapEntry*rel=lfirst(lc);
948+
949+
logicalrep_rel_close(rel,NoLock);
950+
}
951+
952+
CommandCounterIncrement();
953+
}
954+
891955

892956
/*
893957
* Logical replication protocol message dispatcher.
@@ -919,6 +983,10 @@ apply_dispatch(StringInfo s)
919983
case'D':
920984
apply_handle_delete(s);
921985
break;
986+
/* TRUNCATE */
987+
case'T':
988+
apply_handle_truncate(s);
989+
break;
922990
/* RELATION */
923991
case'R':
924992
apply_handle_relation(s);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp