Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5dfd1e5

Browse files
committed
Logical decoding of TRUNCATE
Add a new WAL record type for TRUNCATE, which is only used whenwal_level >= logical. (For physical replication, TRUNCATE is alreadyreplicated via SMGR records.) Add new callback for logical decodingoutput plugins to receive TRUNCATE actions.Author: Simon Riggs <simon@2ndquadrant.com>Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>Reviewed-by: Andres Freund <andres@anarazel.de>Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
1 parentb508a56 commit5dfd1e5

File tree

15 files changed

+414
-13
lines changed

15 files changed

+414
-13
lines changed

‎contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ submake-test_decoding:
3939

4040
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact\
4141
decoding_into_rel binary prepared replorigin time messages\
42-
spill slot
42+
spill slot truncate
4343

4444
regresscheck: | submake-regress submake-test_decoding temp-install
4545
$(pg_regress_check)\
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
2+
?column?
3+
----------
4+
init
5+
(1 row)
6+
7+
CREATE TABLE tab1 (id serial unique, data int);
8+
CREATE TABLE tab2 (a int primary key, b int);
9+
TRUNCATE tab1;
10+
TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
11+
TRUNCATE tab1, tab2;
12+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
13+
data
14+
------------------------------------------------------
15+
BEGIN
16+
table public.tab1: TRUNCATE: (no-flags)
17+
COMMIT
18+
BEGIN
19+
table public.tab1: TRUNCATE: restart_seqs cascade
20+
COMMIT
21+
BEGIN
22+
table public.tab1, public.tab2: TRUNCATE: (no-flags)
23+
COMMIT
24+
(9 rows)
25+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
SELECT'init'FROM pg_create_logical_replication_slot('regression_slot','test_decoding');
2+
3+
CREATETABLEtab1 (idserial unique, dataint);
4+
CREATETABLEtab2 (aintprimary key, bint);
5+
6+
TRUNCATE tab1;
7+
TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
8+
TRUNCATE tab1, tab2;
9+
10+
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');

‎contrib/test_decoding/test_decoding.c

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
5252
staticvoidpg_decode_change(LogicalDecodingContext*ctx,
5353
ReorderBufferTXN*txn,Relationrel,
5454
ReorderBufferChange*change);
55+
staticvoidpg_decode_truncate(LogicalDecodingContext*ctx,
56+
ReorderBufferTXN*txn,
57+
intnrelations,Relationrelations[],
58+
ReorderBufferChange*change);
5559
staticboolpg_decode_filter(LogicalDecodingContext*ctx,
5660
RepOriginIdorigin_id);
5761
staticvoidpg_decode_message(LogicalDecodingContext*ctx,
@@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7478
cb->startup_cb=pg_decode_startup;
7579
cb->begin_cb=pg_decode_begin_txn;
7680
cb->change_cb=pg_decode_change;
81+
cb->truncate_cb=pg_decode_truncate;
7782
cb->commit_cb=pg_decode_commit_txn;
7883
cb->filter_by_origin_cb=pg_decode_filter;
7984
cb->shutdown_cb=pg_decode_shutdown;
@@ -480,6 +485,59 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
480485
OutputPluginWrite(ctx, true);
481486
}
482487

488+
staticvoid
489+
pg_decode_truncate(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
490+
intnrelations,Relationrelations[],ReorderBufferChange*change)
491+
{
492+
TestDecodingData*data;
493+
MemoryContextold;
494+
inti;
495+
496+
data=ctx->output_plugin_private;
497+
498+
/* output BEGIN if we haven't yet */
499+
if (data->skip_empty_xacts&& !data->xact_wrote_changes)
500+
{
501+
pg_output_begin(ctx,data,txn, false);
502+
}
503+
data->xact_wrote_changes= true;
504+
505+
/* Avoid leaking memory by using and resetting our own context */
506+
old=MemoryContextSwitchTo(data->context);
507+
508+
OutputPluginPrepareWrite(ctx, true);
509+
510+
appendStringInfoString(ctx->out,"table ");
511+
512+
for (i=0;i<nrelations;i++)
513+
{
514+
if (i>0)
515+
appendStringInfoString(ctx->out,", ");
516+
517+
appendStringInfoString(ctx->out,
518+
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
519+
NameStr(relations[i]->rd_rel->relname)));
520+
}
521+
522+
appendStringInfoString(ctx->out,": TRUNCATE:");
523+
524+
if (change->data.truncate.restart_seqs
525+
||change->data.truncate.cascade)
526+
{
527+
if (change->data.truncate.restart_seqs)
528+
appendStringInfo(ctx->out," restart_seqs");
529+
if (change->data.truncate.cascade)
530+
appendStringInfo(ctx->out," cascade");
531+
}
532+
else
533+
appendStringInfoString(ctx->out," (no-flags)");
534+
535+
MemoryContextSwitchTo(old);
536+
MemoryContextReset(data->context);
537+
538+
OutputPluginWrite(ctx, true);
539+
}
540+
483541
staticvoid
484542
pg_decode_message(LogicalDecodingContext*ctx,
485543
ReorderBufferTXN*txn,XLogRecPtrlsn,booltransactional,

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ typedef struct OutputPluginCallbacks
383383
LogicalDecodeStartupCB startup_cb;
384384
LogicalDecodeBeginCB begin_cb;
385385
LogicalDecodeChangeCB change_cb;
386+
LogicalDecodeTruncateCB truncate_cb;
386387
LogicalDecodeCommitCB commit_cb;
387388
LogicalDecodeMessageCB message_cb;
388389
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
@@ -394,8 +395,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
394395
The <function>begin_cb</function>, <function>change_cb</function>
395396
and <function>commit_cb</function> callbacks are required,
396397
while <function>startup_cb</function>,
397-
<function>filter_by_origin_cb</function>
398+
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
398399
and <function>shutdown_cb</function> are optional.
400+
If <function>truncate_cb</function> is not set but a
401+
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
399402
</para>
400403
</sect2>
401404

@@ -590,6 +593,28 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
590593
</note>
591594
</sect3>
592595

596+
<sect3 id="logicaldecoding-output-plugin-truncate">
597+
<title>Truncate Callback</title>
598+
599+
<para>
600+
The <function>truncate_cb</function> callback is called for a
601+
<command>TRUNCATE</command> command.
602+
<programlisting>
603+
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
604+
ReorderBufferTXN *txn,
605+
int nrelations,
606+
Relation relations[],
607+
ReorderBufferChange *change);
608+
</programlisting>
609+
The parameters are analogous to the <function>change_cb</function>
610+
callback. However, because <command>TRUNCATE</command> actions on
611+
tables connected by foreign keys need to be executed together, this
612+
callback receives an array of relations instead of just a single one.
613+
See the description of the <xref linkend="sql-truncate"/> statement for
614+
details.
615+
</para>
616+
</sect3>
617+
593618
<sect3 id="logicaldecoding-output-plugin-filter-origin">
594619
<title>Origin Filter Callback</title>
595620

‎src/backend/access/heap/heapam.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record)
92609260
caseXLOG_HEAP_UPDATE:
92619261
heap_xlog_update(record, false);
92629262
break;
9263+
caseXLOG_HEAP_TRUNCATE:
9264+
/*
9265+
* TRUNCATE is a no-op because the actions are already logged as
9266+
* SMGR WAL records. TRUNCATE WAL record only exists for logical
9267+
* decoding.
9268+
*/
9269+
break;
92639270
caseXLOG_HEAP_HOT_UPDATE:
92649271
heap_xlog_update(record, true);
92659272
break;

‎src/backend/access/rmgrdesc/heapdesc.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ heap_desc(StringInfo buf, XLogReaderState *record)
7575
xlrec->new_offnum,
7676
xlrec->new_xmax);
7777
}
78+
elseif (info==XLOG_HEAP_TRUNCATE)
79+
{
80+
xl_heap_truncate*xlrec= (xl_heap_truncate*)rec;
81+
inti;
82+
83+
if (xlrec->flags&XLH_TRUNCATE_CASCADE)
84+
appendStringInfo(buf,"cascade ");
85+
if (xlrec->flags&XLH_TRUNCATE_RESTART_SEQS)
86+
appendStringInfo(buf,"restart_seqs ");
87+
appendStringInfo(buf,"nrelids %u relids",xlrec->nrelids);
88+
for (i=0;i<xlrec->nrelids;i++)
89+
appendStringInfo(buf," %u",xlrec->relids[i]);
90+
}
7891
elseif (info==XLOG_HEAP_CONFIRM)
7992
{
8093
xl_heap_confirm*xlrec= (xl_heap_confirm*)rec;
@@ -186,6 +199,9 @@ heap_identify(uint8 info)
186199
caseXLOG_HEAP_HOT_UPDATE |XLOG_HEAP_INIT_PAGE:
187200
id="HOT_UPDATE+INIT";
188201
break;
202+
caseXLOG_HEAP_TRUNCATE:
203+
id="TRUNCATE";
204+
break;
189205
caseXLOG_HEAP_CONFIRM:
190206
id="HEAP_CONFIRM";
191207
break;

‎src/backend/commands/tablecmds.c

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include"access/genam.h"
1818
#include"access/heapam.h"
19+
#include"access/heapam_xlog.h"
1920
#include"access/multixact.h"
2021
#include"access/reloptions.h"
2122
#include"access/relscan.h"
@@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt)
13221323
{
13231324
List*rels=NIL;
13241325
List*relids=NIL;
1325-
List*seq_relids=NIL;
1326-
EState*estate;
1327-
ResultRelInfo*resultRelInfos;
1328-
ResultRelInfo*resultRelInfo;
1329-
SubTransactionIdmySubid;
1326+
List*relids_logged=NIL;
13301327
ListCell*cell;
13311328

13321329
/*
@@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt)
13501347
truncate_check_rel(rel);
13511348
rels=lappend(rels,rel);
13521349
relids=lappend_oid(relids,myrelid);
1350+
/* Log this relation only if needed for logical decoding */
1351+
if (RelationIsLogicallyLogged(rel))
1352+
relids_logged=lappend_oid(relids_logged,myrelid);
13531353

13541354
if (recurse)
13551355
{
@@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt)
13701370
truncate_check_rel(rel);
13711371
rels=lappend(rels,rel);
13721372
relids=lappend_oid(relids,childrelid);
1373+
/* Log this relation only if needed for logical decoding */
1374+
if (RelationIsLogicallyLogged(rel))
1375+
relids_logged=lappend_oid(relids_logged,childrelid);
13731376
}
13741377
}
13751378
elseif (rel->rd_rel->relkind==RELKIND_PARTITIONED_TABLE)
@@ -1379,15 +1382,56 @@ ExecuteTruncate(TruncateStmt *stmt)
13791382
errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
13801383
}
13811384

1385+
ExecuteTruncateGuts(rels,relids,relids_logged,
1386+
stmt->behavior,stmt->restart_seqs);
1387+
1388+
/* And close the rels */
1389+
foreach(cell,rels)
1390+
{
1391+
Relationrel= (Relation)lfirst(cell);
1392+
1393+
heap_close(rel,NoLock);
1394+
}
1395+
}
1396+
1397+
/*
1398+
* ExecuteTruncateGuts
1399+
*
1400+
* Internal implementation of TRUNCATE. This is called by the actual TRUNCATE
1401+
* command (see above) as well as replication subscribers that execute a
1402+
* replicated TRUNCATE action.
1403+
*
1404+
* explicit_rels is the list of Relations to truncate that the command
1405+
* specified. relids is the list of Oids corresponding to explicit_rels.
1406+
* relids_logged is the list of Oids (a subset of relids) that require
1407+
* WAL-logging. This is all a bit redundant, but the existing callers have
1408+
* this information handy in this form.
1409+
*/
1410+
void
1411+
ExecuteTruncateGuts(List*explicit_rels,List*relids,List*relids_logged,
1412+
DropBehaviorbehavior,boolrestart_seqs)
1413+
{
1414+
List*rels;
1415+
List*seq_relids=NIL;
1416+
EState*estate;
1417+
ResultRelInfo*resultRelInfos;
1418+
ResultRelInfo*resultRelInfo;
1419+
SubTransactionIdmySubid;
1420+
ListCell*cell;
1421+
Oid*logrelids;
1422+
13821423
/*
1424+
* Open, exclusive-lock, and check all the explicitly-specified relations
1425+
*
13831426
* In CASCADE mode, suck in all referencing relations as well. This
13841427
* requires multiple iterations to find indirectly-dependent relations. At
13851428
* each phase, we need to exclusive-lock new rels before looking for their
13861429
* dependencies, else we might miss something. Also, we check each rel as
13871430
* soon as we open it, to avoid a faux pas such as holding lock for a long
13881431
* time on a rel we have no permissions for.
13891432
*/
1390-
if (stmt->behavior==DROP_CASCADE)
1433+
rels=list_copy(explicit_rels);
1434+
if (behavior==DROP_CASCADE)
13911435
{
13921436
for (;;)
13931437
{
@@ -1409,6 +1453,9 @@ ExecuteTruncate(TruncateStmt *stmt)
14091453
truncate_check_rel(rel);
14101454
rels=lappend(rels,rel);
14111455
relids=lappend_oid(relids,relid);
1456+
/* Log this relation only if needed for logical decoding */
1457+
if (RelationIsLogicallyLogged(rel))
1458+
relids_logged=lappend_oid(relids_logged,relid);
14121459
}
14131460
}
14141461
}
@@ -1421,7 +1468,7 @@ ExecuteTruncate(TruncateStmt *stmt)
14211468
#ifdefUSE_ASSERT_CHECKING
14221469
heap_truncate_check_FKs(rels, false);
14231470
#else
1424-
if (stmt->behavior==DROP_RESTRICT)
1471+
if (behavior==DROP_RESTRICT)
14251472
heap_truncate_check_FKs(rels, false);
14261473
#endif
14271474

@@ -1431,7 +1478,7 @@ ExecuteTruncate(TruncateStmt *stmt)
14311478
* We want to do this early since it's pointless to do all the truncation
14321479
* work only to fail on sequence permissions.
14331480
*/
1434-
if (stmt->restart_seqs)
1481+
if (restart_seqs)
14351482
{
14361483
foreach(cell,rels)
14371484
{
@@ -1586,6 +1633,41 @@ ExecuteTruncate(TruncateStmt *stmt)
15861633
ResetSequence(seq_relid);
15871634
}
15881635

1636+
/*
1637+
* Write a WAL record to allow this set of actions to be logically decoded.
1638+
*
1639+
* Assemble an array of relids so we can write a single WAL record for the
1640+
* whole action.
1641+
*/
1642+
if (list_length(relids_logged)>0)
1643+
{
1644+
xl_heap_truncatexlrec;
1645+
inti=0;
1646+
1647+
/* should only get here if wal_level >= logical */
1648+
Assert(XLogLogicalInfoActive());
1649+
1650+
logrelids=palloc(list_length(relids_logged)*sizeof(Oid));
1651+
foreach (cell,relids_logged)
1652+
logrelids[i++]=lfirst_oid(cell);
1653+
1654+
xlrec.dbId=MyDatabaseId;
1655+
xlrec.nrelids=list_length(relids_logged);
1656+
xlrec.flags=0;
1657+
if (behavior==DROP_CASCADE)
1658+
xlrec.flags |=XLH_TRUNCATE_CASCADE;
1659+
if (restart_seqs)
1660+
xlrec.flags |=XLH_TRUNCATE_RESTART_SEQS;
1661+
1662+
XLogBeginInsert();
1663+
XLogRegisterData((char*)&xlrec,SizeOfHeapTruncate);
1664+
XLogRegisterData((char*)logrelids,list_length(relids_logged)*sizeof(Oid));
1665+
1666+
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1667+
1668+
(void)XLogInsert(RM_HEAP_ID,XLOG_HEAP_TRUNCATE);
1669+
}
1670+
15891671
/*
15901672
* Process all AFTER STATEMENT TRUNCATE triggers.
15911673
*/
@@ -1603,7 +1685,11 @@ ExecuteTruncate(TruncateStmt *stmt)
16031685
/* We can clean up the EState now */
16041686
FreeExecutorState(estate);
16051687

1606-
/* And close the rels (can't do this while EState still holds refs) */
1688+
/*
1689+
* Close any rels opened by CASCADE (can't do this while EState still
1690+
* holds refs)
1691+
*/
1692+
rels=list_difference_ptr(rels,explicit_rels);
16071693
foreach(cell,rels)
16081694
{
16091695
Relationrel= (Relation)lfirst(cell);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp