1
+ /* ------------------------------------------------------------------------
2
+ *
3
+ * copy_stmt_hooking.c
4
+ *Override COPY TO/FROM statement for partitioned tables
5
+ *
6
+ * Copyright (c) 2016, Postgres Professional
7
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
8
+ * Portions Copyright (c) 1994, Regents of the University of California
9
+ *
10
+ * ------------------------------------------------------------------------
11
+ */
12
+
1
13
#include "copy_stmt_hooking.h"
14
+ #include "init.h"
2
15
#include "relation_info.h"
3
16
17
+ #include "access/htup_details.h"
18
+ #include "access/sysattr.h"
19
+ #include "access/xact.h"
4
20
#include "catalog/namespace.h"
21
+ #include "catalog/pg_attribute.h"
5
22
#include "commands/copy.h"
23
+ #include "commands/trigger.h"
24
+ #include "executor/executor.h"
25
+ #include "miscadmin.h"
26
+ #include "nodes/makefuncs.h"
27
+ #include "utils/builtins.h"
28
+ #include "utils/lsyscache.h"
29
+ #include "utils/rel.h"
30
+ #include "utils/rls.h"
31
+
32
+ #include "libpq/libpq.h"
6
33
7
34
8
35
/*
@@ -14,6 +41,12 @@ is_pathman_related_copy(Node *parsetree)
14
41
CopyStmt * copy_stmt = (CopyStmt * )parsetree ;
15
42
Oid partitioned_table ;
16
43
44
+ if (!IsOverrideCopyEnabled ())
45
+ {
46
+ elog (DEBUG1 ,"COPY statement hooking is disabled" );
47
+ return false;
48
+ }
49
+
17
50
/* Check that it's a CopyStmt */
18
51
if (!IsA (parsetree ,CopyStmt ))
19
52
return false;
@@ -23,11 +56,266 @@ is_pathman_related_copy(Node *parsetree)
23
56
return false;
24
57
25
58
/* TODO: select appropriate lock for COPY */
26
- partitioned_table = RangeVarGetRelid (copy_stmt -> relation ,NoLock , false);
59
+ partitioned_table = RangeVarGetRelid (copy_stmt -> relation ,
60
+ (copy_stmt -> is_from ?
61
+ RowExclusiveLock :
62
+ AccessShareLock ),
63
+ false);
27
64
28
65
/* Check that relation is partitioned */
29
66
if (get_pathman_relation_info (partitioned_table ))
67
+ {
68
+ elog (DEBUG1 ,"Overriding default behavior for COPY (%u)" ,partitioned_table );
30
69
return true;
70
+ }
31
71
32
72
return false;
33
73
}
74
+
75
+ /*
76
+ * CopyGetAttnums - build an integer list of attnums to be copied
77
+ *
78
+ * The input attnamelist is either the user-specified column list,
79
+ * or NIL if there was none (in which case we want all the non-dropped
80
+ * columns).
81
+ *
82
+ * rel can be NULL ... it's only used for error reports.
83
+ */
84
+ static List *
85
+ CopyGetAttnums (TupleDesc tupDesc ,Relation rel ,List * attnamelist )
86
+ {
87
+ List * attnums = NIL ;
88
+
89
+ if (attnamelist == NIL )
90
+ {
91
+ /* Generate default column list */
92
+ Form_pg_attribute * attr = tupDesc -> attrs ;
93
+ int attr_count = tupDesc -> natts ;
94
+ int i ;
95
+
96
+ for (i = 0 ;i < attr_count ;i ++ )
97
+ {
98
+ if (attr [i ]-> attisdropped )
99
+ continue ;
100
+ attnums = lappend_int (attnums ,i + 1 );
101
+ }
102
+ }
103
+ else
104
+ {
105
+ /* Validate the user-supplied list and extract attnums */
106
+ ListCell * l ;
107
+
108
+ foreach (l ,attnamelist )
109
+ {
110
+ char * name = strVal (lfirst (l ));
111
+ int attnum ;
112
+ int i ;
113
+
114
+ /* Lookup column name */
115
+ attnum = InvalidAttrNumber ;
116
+ for (i = 0 ;i < tupDesc -> natts ;i ++ )
117
+ {
118
+ if (tupDesc -> attrs [i ]-> attisdropped )
119
+ continue ;
120
+ if (namestrcmp (& (tupDesc -> attrs [i ]-> attname ),name )== 0 )
121
+ {
122
+ attnum = tupDesc -> attrs [i ]-> attnum ;
123
+ break ;
124
+ }
125
+ }
126
+ if (attnum == InvalidAttrNumber )
127
+ {
128
+ if (rel != NULL )
129
+ ereport (ERROR ,
130
+ (errcode (ERRCODE_UNDEFINED_COLUMN ),
131
+ errmsg ("column \"%s\" of relation \"%s\" does not exist" ,
132
+ name ,RelationGetRelationName (rel ))));
133
+ else
134
+ ereport (ERROR ,
135
+ (errcode (ERRCODE_UNDEFINED_COLUMN ),
136
+ errmsg ("column \"%s\" does not exist" ,
137
+ name )));
138
+ }
139
+ /* Check for duplicates */
140
+ if (list_member_int (attnums ,attnum ))
141
+ ereport (ERROR ,
142
+ (errcode (ERRCODE_DUPLICATE_COLUMN ),
143
+ errmsg ("column \"%s\" specified more than once" ,
144
+ name )));
145
+ attnums = lappend_int (attnums ,attnum );
146
+ }
147
+ }
148
+
149
+ return attnums ;
150
+ }
151
+
152
+ /*
153
+ * Execute COPY TO/FROM statement for a partitioned table.
154
+ * NOTE: based on DoCopy() (see copy.c).
155
+ */
156
+ void
157
+ PathmanDoCopy (const CopyStmt * stmt ,const char * queryString ,uint64 * processed )
158
+ {
159
+ CopyState cstate ;
160
+ bool is_from = stmt -> is_from ;
161
+ bool pipe = (stmt -> filename == NULL );
162
+ Relation rel ;
163
+ Oid relid ;
164
+ Node * query = NULL ;
165
+ List * range_table = NIL ;
166
+
167
+ /* Disallow COPY TO/FROM file or program except to superusers. */
168
+ if (!pipe && !superuser ())
169
+ {
170
+ if (stmt -> is_program )
171
+ ereport (ERROR ,
172
+ (errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
173
+ errmsg ("must be superuser to COPY to or from an external program" ),
174
+ errhint ("Anyone can COPY to stdout or from stdin. "
175
+ "psql's \\copy command also works for anyone." )));
176
+ else
177
+ ereport (ERROR ,
178
+ (errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
179
+ errmsg ("must be superuser to COPY to or from a file" ),
180
+ errhint ("Anyone can COPY to stdout or from stdin. "
181
+ "psql's \\copy command also works for anyone." )));
182
+ }
183
+
184
+ if (stmt -> relation )
185
+ {
186
+ TupleDesc tupDesc ;
187
+ AclMode required_access = (is_from ?ACL_INSERT :ACL_SELECT );
188
+ List * attnums ;
189
+ ListCell * cur ;
190
+ RangeTblEntry * rte ;
191
+
192
+ Assert (!stmt -> query );
193
+
194
+ /* Open the relation (we've locked it in is_pathman_related_copy()) */
195
+ rel = heap_openrv (stmt -> relation ,NoLock );
196
+
197
+ relid = RelationGetRelid (rel );
198
+
199
+ rte = makeNode (RangeTblEntry );
200
+ rte -> rtekind = RTE_RELATION ;
201
+ rte -> relid = RelationGetRelid (rel );
202
+ rte -> relkind = rel -> rd_rel -> relkind ;
203
+ rte -> requiredPerms = required_access ;
204
+ range_table = list_make1 (rte );
205
+
206
+ tupDesc = RelationGetDescr (rel );
207
+ attnums = CopyGetAttnums (tupDesc ,rel ,stmt -> attlist );
208
+ foreach (cur ,attnums )
209
+ {
210
+ int attno = lfirst_int (cur )- FirstLowInvalidHeapAttributeNumber ;
211
+
212
+ if (is_from )
213
+ rte -> insertedCols = bms_add_member (rte -> insertedCols ,attno );
214
+ else
215
+ rte -> selectedCols = bms_add_member (rte -> selectedCols ,attno );
216
+ }
217
+ ExecCheckRTPerms (range_table , true);
218
+
219
+ /*
220
+ * We should perform a query instead of low-level heap scan whenever:
221
+ *a) table has a RLS policy;
222
+ *b) table is partitioned & it's COPY FROM.
223
+ */
224
+ if (check_enable_rls (rte -> relid ,InvalidOid , false)== RLS_ENABLED ||
225
+ is_from == false)/* rewrite COPY table TO statements */
226
+ {
227
+ SelectStmt * select ;
228
+ ColumnRef * cr ;
229
+ ResTarget * target ;
230
+ RangeVar * from ;
231
+
232
+ if (is_from )
233
+ ereport (ERROR ,
234
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
235
+ errmsg ("COPY FROM not supported with row-level security" ),
236
+ errhint ("Use INSERT statements instead." )));
237
+
238
+ /* Build target list */
239
+ cr = makeNode (ColumnRef );
240
+
241
+ if (!stmt -> attlist )
242
+ cr -> fields = list_make1 (makeNode (A_Star ));
243
+ else
244
+ cr -> fields = stmt -> attlist ;
245
+
246
+ cr -> location = 1 ;
247
+
248
+ target = makeNode (ResTarget );
249
+ target -> name = NULL ;
250
+ target -> indirection = NIL ;
251
+ target -> val = (Node * )cr ;
252
+ target -> location = 1 ;
253
+
254
+ /*
255
+ * Build RangeVar for from clause, fully qualified based on the
256
+ * relation which we have opened and locked.
257
+ */
258
+ from = makeRangeVar (get_namespace_name (RelationGetNamespace (rel )),
259
+ RelationGetRelationName (rel ),-1 );
260
+
261
+ /* Build query */
262
+ select = makeNode (SelectStmt );
263
+ select -> targetList = list_make1 (target );
264
+ select -> fromClause = list_make1 (from );
265
+
266
+ query = (Node * )select ;
267
+
268
+ /*
269
+ * Close the relation for now, but keep the lock on it to prevent
270
+ * changes between now and when we start the query-based COPY.
271
+ *
272
+ * We'll reopen it later as part of the query-based COPY.
273
+ */
274
+ heap_close (rel ,NoLock );
275
+ rel = NULL ;
276
+ }
277
+ }
278
+ else
279
+ {
280
+ Assert (stmt -> query );
281
+
282
+ query = stmt -> query ;
283
+ relid = InvalidOid ;
284
+ rel = NULL ;
285
+ }
286
+
287
+ /* COPY ... FROM ... */
288
+ if (is_from )
289
+ {
290
+ /* There should be relation */
291
+ Assert (rel );
292
+
293
+ /* check read-only transaction and parallel mode */
294
+ if (XactReadOnly && !rel -> rd_islocaltemp )
295
+ PreventCommandIfReadOnly ("PATHMAN COPY FROM" );
296
+ PreventCommandIfParallelMode ("PATHMAN COPY FROM" );
297
+
298
+ cstate = BeginCopyFrom (rel ,stmt -> filename ,stmt -> is_program ,
299
+ stmt -> attlist ,stmt -> options );
300
+ /* TODO: copy files to DB */
301
+ heap_close (rel ,NoLock );
302
+ * processed = 0 ;
303
+ EndCopyFrom (cstate );
304
+ }
305
+ /* COPY ... TO ... */
306
+ else
307
+ {
308
+ CopyStmt modified_copy_stmt ;
309
+
310
+ /* We should've created a query */
311
+ Assert (query );
312
+
313
+ /* Copy 'stmt' and override some of the fields */
314
+ modified_copy_stmt = * stmt ;
315
+ modified_copy_stmt .relation = NULL ;
316
+ modified_copy_stmt .query = query ;
317
+
318
+ /* Call standard DoCopy using a new CopyStmt */
319
+ DoCopy (& modified_copy_stmt ,queryString ,processed );
320
+ }
321
+ }