Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9fcdf2c

Browse files
committed
Add support for COPY TO callback functions
This is useful as a way for extensions to process COPY TO rows in theway they see fit (say auditing, analytics, backend, etc.) without theneed to invoke an external process running as the OS user running thebackend through PROGRAM that requires superuser rights. COPY FROMalready provides a similar callback for logical replication. For COPYTO, the callback is triggered when we are ready to send a row inCopySendEndOfRow(), which is the same code path as when sending a rowto a frontend or a pipe/file.A small test module, test_copy_callbacks, is added to provide somecoverage for this facility.Author: Bilva Sanaba, Nathan BossartDiscussion:https://postgr.es/m/253C21D1-FCEB-41D9-A2AF-E6517015B7D7@amazon.com
1 parent0e87dfe commit9fcdf2c

File tree

14 files changed

+175
-5
lines changed

14 files changed

+175
-5
lines changed

‎src/backend/commands/copy.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
310310

311311
cstate=BeginCopyTo(pstate,rel,query,relid,
312312
stmt->filename,stmt->is_program,
313-
stmt->attlist,stmt->options);
313+
NULL,stmt->attlist,stmt->options);
314314
*processed=DoCopyTo(cstate);/* copy from database to file */
315315
EndCopyTo(cstate);
316316
}

‎src/backend/commands/copyto.c

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ typedef enum CopyDest
5151
{
5252
COPY_FILE,/* to file (or a piped program) */
5353
COPY_FRONTEND,/* to frontend */
54+
COPY_CALLBACK/* to callback function */
5455
}CopyDest;
5556

5657
/*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
8586
List*attnumlist;/* integer list of attnums to copy */
8687
char*filename;/* filename, or NULL for STDOUT */
8788
boolis_program;/* is 'filename' a program to popen? */
89+
copy_data_dest_cbdata_dest_cb;/* function for writing data */
8890

8991
CopyFormatOptionsopts;
9092
Node*whereClause;/* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
247249
/* Dump the accumulated row as one CopyData message */
248250
(void)pq_putmessage('d',fe_msgbuf->data,fe_msgbuf->len);
249251
break;
252+
caseCOPY_CALLBACK:
253+
cstate->data_dest_cb(fe_msgbuf->data,fe_msgbuf->len);
254+
break;
250255
}
251256

252257
/* Update the progress */
@@ -336,6 +341,17 @@ EndCopy(CopyToState cstate)
336341

337342
/*
338343
* Setup CopyToState to read tuples from a table or a query for COPY TO.
344+
*
345+
* 'rel': Relation to be copied
346+
* 'raw_query': Query whose results are to be copied
347+
* 'queryRelId': OID of base relation to convert to a query (for RLS)
348+
* 'filename': Name of server-local file to write, NULL for STDOUT
349+
* 'is_program': true if 'filename' is program to execute
350+
* 'data_dest_cb': Callback that processes the output data
351+
* 'attnamelist': List of char *, columns to include. NIL selects all cols.
352+
* 'options': List of DefElem. See copy_opt_item in gram.y for selections.
353+
*
354+
* Returns a CopyToState, to be passed to DoCopyTo() and related functions.
339355
*/
340356
CopyToState
341357
BeginCopyTo(ParseState*pstate,
@@ -344,11 +360,12 @@ BeginCopyTo(ParseState *pstate,
344360
OidqueryRelId,
345361
constchar*filename,
346362
boolis_program,
363+
copy_data_dest_cbdata_dest_cb,
347364
List*attnamelist,
348365
List*options)
349366
{
350367
CopyToStatecstate;
351-
boolpipe= (filename==NULL);
368+
boolpipe= (filename==NULL&&data_dest_cb==NULL);
352369
TupleDesctupDesc;
353370
intnum_phys_attrs;
354371
MemoryContextoldcontext;
@@ -656,7 +673,13 @@ BeginCopyTo(ParseState *pstate,
656673

657674
cstate->copy_dest=COPY_FILE;/* default */
658675

659-
if (pipe)
676+
if (data_dest_cb)
677+
{
678+
progress_vals[1]=PROGRESS_COPY_TYPE_CALLBACK;
679+
cstate->copy_dest=COPY_CALLBACK;
680+
cstate->data_dest_cb=data_dest_cb;
681+
}
682+
elseif (pipe)
660683
{
661684
progress_vals[1]=PROGRESS_COPY_TYPE_PIPE;
662685

@@ -765,11 +788,13 @@ EndCopyTo(CopyToState cstate)
765788

766789
/*
767790
* Copy from relation or query TO file.
791+
*
792+
* Returns the number of rows processed.
768793
*/
769794
uint64
770795
DoCopyTo(CopyToStatecstate)
771796
{
772-
boolpipe= (cstate->filename==NULL);
797+
boolpipe= (cstate->filename==NULL&&cstate->data_dest_cb==NULL);
773798
boolfe_copy= (pipe&&whereToSendOutput==DestRemote);
774799
TupleDesctupDesc;
775800
intnum_phys_attrs;

‎src/include/commands/copy.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
6666
typedefstructCopyToStateData*CopyToState;
6767

6868
typedefint (*copy_data_source_cb) (void*outbuf,intminread,intmaxread);
69+
typedefvoid (*copy_data_dest_cb) (void*data,intlen);
6970

7071
externvoidDoCopy(ParseState*pstate,constCopyStmt*stmt,
7172
intstmt_location,intstmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
9192
*/
9293
externCopyToStateBeginCopyTo(ParseState*pstate,Relationrel,RawStmt*raw_query,
9394
OidqueryRelId,constchar*filename,boolis_program,
94-
List*attnamelist,List*options);
95+
copy_data_dest_cbdata_dest_cb,List*attnamelist,List*options);
9596
externvoidEndCopyTo(CopyToStatecstate);
9697
externuint64DoCopyTo(CopyToStatecstate);
9798
externList*CopyGetAttnums(TupleDesctupDesc,Relationrel,

‎src/test/modules/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ SUBDIRS = \
1515
snapshot_too_old\
1616
spgist_name_ops\
1717
test_bloomfilter\
18+
test_copy_callbacks\
1819
test_ddl_deparse\
1920
test_extensions\
2021
test_ginpostinglist\

‎src/test/modules/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
99
subdir('spgist_name_ops')
1010
subdir('ssl_passphrase_callback')
1111
subdir('test_bloomfilter')
12+
subdir('test_copy_callbacks')
1213
subdir('test_ddl_deparse')
1314
subdir('test_extensions')
1415
subdir('test_ginpostinglist')
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Generated subdirectories
2+
/log/
3+
/results/
4+
/tmp_check/
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# src/test/modules/test_copy_callbacks/Makefile
2+
3+
MODULE_big = test_copy_callbacks
4+
OBJS =\
5+
$(WIN32RES)\
6+
test_copy_callbacks.o
7+
PGFILEDESC = "test_copy_callbacks - test COPY callbacks"
8+
9+
EXTENSION = test_copy_callbacks
10+
DATA = test_copy_callbacks--1.0.sql
11+
12+
REGRESS = test_copy_callbacks
13+
14+
ifdefUSE_PGXS
15+
PG_CONFIG = pg_config
16+
PGXS :=$(shell$(PG_CONFIG) --pgxs)
17+
include$(PGXS)
18+
else
19+
subdir = src/test/modules/test_copy_callbacks
20+
top_builddir = ../../../..
21+
include$(top_builddir)/src/Makefile.global
22+
include$(top_srcdir)/contrib/contrib-global.mk
23+
endif
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
CREATE EXTENSION test_copy_callbacks;
2+
CREATE TABLE public.test (a INT, b INT, c INT);
3+
INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
4+
SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
5+
NOTICE: COPY TO callback called with data "123" and length 5
6+
NOTICE: COPY TO callback called with data "123456" and length 8
7+
NOTICE: COPY TO callback called with data "123456789" and length 11
8+
NOTICE: COPY TO callback has processed 3 rows
9+
test_copy_to_callback
10+
-----------------------
11+
12+
(1 row)
13+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# FIXME: prevent install during main install, but not during test :/
2+
3+
test_copy_callbacks_sources=files(
4+
'test_copy_callbacks.c',
5+
)
6+
7+
if host_system=='windows'
8+
test_copy_callbacks_sources+= rc_lib_gen.process(win32ver_rc,extra_args: [
9+
'--NAME','test_copy_callbacks',
10+
'--FILEDESC','test_copy_callbacks - test COPY callbacks',])
11+
endif
12+
13+
test_copy_callbacks=shared_module('test_copy_callbacks',
14+
test_copy_callbacks_sources,
15+
kwargs: pg_mod_args,
16+
)
17+
testprep_targets+= test_copy_callbacks
18+
19+
install_data(
20+
'test_copy_callbacks.control',
21+
'test_copy_callbacks--1.0.sql',
22+
kwargs: contrib_data_args,
23+
)
24+
25+
tests+= {
26+
'name':'test_copy_callbacks',
27+
'sd':meson.current_source_dir(),
28+
'bd':meson.current_build_dir(),
29+
'regress': {
30+
'sql': [
31+
'test_copy_callbacks',
32+
],
33+
},
34+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
CREATE EXTENSION test_copy_callbacks;
2+
CREATETABLEpublic.test (aINT, bINT, cINT);
3+
INSERT INTOpublic.testVALUES (1,2,3), (12,34,56), (123,456,789);
4+
SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql*/
2+
3+
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
4+
\echo Use"CREATE EXTENSION test_copy_callbacks" to load this file. \quit
5+
6+
CREATEFUNCTIONtest_copy_to_callback(pg_catalog.regclass)
7+
RETURNSpg_catalog.void
8+
AS'MODULE_PATHNAME' LANGUAGE C;
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*--------------------------------------------------------------------------
2+
*
3+
* test_copy_callbacks.c
4+
*Code for testing COPY callbacks.
5+
*
6+
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
* IDENTIFICATION
10+
*src/test/modules/test_copy_callbacks/test_copy_callbacks.c
11+
*
12+
* -------------------------------------------------------------------------
13+
*/
14+
15+
#include"postgres.h"
16+
17+
#include"access/table.h"
18+
#include"commands/copy.h"
19+
#include"fmgr.h"
20+
#include"utils/rel.h"
21+
22+
PG_MODULE_MAGIC;
23+
24+
staticvoid
25+
to_cb(void*data,intlen)
26+
{
27+
ereport(NOTICE,
28+
(errmsg("COPY TO callback called with data \"%s\" and length %d",
29+
(char*)data,len)));
30+
}
31+
32+
PG_FUNCTION_INFO_V1(test_copy_to_callback);
33+
Datum
34+
test_copy_to_callback(PG_FUNCTION_ARGS)
35+
{
36+
Relationrel=table_open(PG_GETARG_OID(0),AccessShareLock);
37+
CopyToStatecstate;
38+
int64processed;
39+
40+
cstate=BeginCopyTo(NULL,rel,NULL,RelationGetRelid(rel),NULL,NULL,
41+
to_cb,NIL,NIL);
42+
processed=DoCopyTo(cstate);
43+
EndCopyTo(cstate);
44+
45+
ereport(NOTICE, (errmsg("COPY TO callback has processed %lld rows",
46+
(long long)processed)));
47+
48+
table_close(rel,NoLock);
49+
50+
PG_RETURN_VOID();
51+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
comment = 'Test code for COPY callbacks'
2+
default_version = '1.0'
3+
module_pathname = '$libdir/test_copy_callbacks'
4+
relocatable = true

‎src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3177,6 +3177,7 @@ compare_context
31773177
config_var_value
31783178
contain_aggs_of_level_context
31793179
convert_testexpr_context
3180+
copy_data_dest_cb
31803181
copy_data_source_cb
31813182
core_YYSTYPE
31823183
core_yy_extra_type

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp