Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit381834c

Browse files
committed
light refactoring, introduce GUC variable 'pg_pathman.override_copy', implement COPY TO for partitioned tables
1 parent2f5804f commit381834c

File tree

8 files changed

+352
-17
lines changed

8 files changed

+352
-17
lines changed

‎src/copy_stmt_hooking.c

Lines changed: 289 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,35 @@
1+
/* ------------------------------------------------------------------------
2+
*
3+
* copy_stmt_hooking.c
4+
*Override COPY TO/FROM statement for partitioned tables
5+
*
6+
* Copyright (c) 2016, Postgres Professional
7+
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
8+
* Portions Copyright (c) 1994, Regents of the University of California
9+
*
10+
* ------------------------------------------------------------------------
11+
*/
12+
113
#include"copy_stmt_hooking.h"
14+
#include"init.h"
215
#include"relation_info.h"
316

17+
#include"access/htup_details.h"
18+
#include"access/sysattr.h"
19+
#include"access/xact.h"
420
#include"catalog/namespace.h"
21+
#include"catalog/pg_attribute.h"
522
#include"commands/copy.h"
23+
#include"commands/trigger.h"
24+
#include"executor/executor.h"
25+
#include"miscadmin.h"
26+
#include"nodes/makefuncs.h"
27+
#include"utils/builtins.h"
28+
#include"utils/lsyscache.h"
29+
#include"utils/rel.h"
30+
#include"utils/rls.h"
31+
32+
#include"libpq/libpq.h"
633

734

835
/*
@@ -14,6 +41,12 @@ is_pathman_related_copy(Node *parsetree)
1441
CopyStmt*copy_stmt= (CopyStmt*)parsetree;
1542
Oidpartitioned_table;
1643

44+
if (!IsOverrideCopyEnabled())
45+
{
46+
elog(DEBUG1,"COPY statement hooking is disabled");
47+
return false;
48+
}
49+
1750
/* Check that it's a CopyStmt */
1851
if (!IsA(parsetree,CopyStmt))
1952
return false;
@@ -23,11 +56,266 @@ is_pathman_related_copy(Node *parsetree)
2356
return false;
2457

2558
/* TODO: select appropriate lock for COPY */
26-
partitioned_table=RangeVarGetRelid(copy_stmt->relation,NoLock, false);
59+
partitioned_table=RangeVarGetRelid(copy_stmt->relation,
60+
(copy_stmt->is_from ?
61+
RowExclusiveLock :
62+
AccessShareLock),
63+
false);
2764

2865
/* Check that relation is partitioned */
2966
if (get_pathman_relation_info(partitioned_table))
67+
{
68+
elog(DEBUG1,"Overriding default behavior for COPY (%u)",partitioned_table);
3069
return true;
70+
}
3171

3272
return false;
3373
}
74+
75+
/*
76+
* CopyGetAttnums - build an integer list of attnums to be copied
77+
*
78+
* The input attnamelist is either the user-specified column list,
79+
* or NIL if there was none (in which case we want all the non-dropped
80+
* columns).
81+
*
82+
* rel can be NULL ... it's only used for error reports.
83+
*/
84+
staticList*
85+
CopyGetAttnums(TupleDesctupDesc,Relationrel,List*attnamelist)
86+
{
87+
List*attnums=NIL;
88+
89+
if (attnamelist==NIL)
90+
{
91+
/* Generate default column list */
92+
Form_pg_attribute*attr=tupDesc->attrs;
93+
intattr_count=tupDesc->natts;
94+
inti;
95+
96+
for (i=0;i<attr_count;i++)
97+
{
98+
if (attr[i]->attisdropped)
99+
continue;
100+
attnums=lappend_int(attnums,i+1);
101+
}
102+
}
103+
else
104+
{
105+
/* Validate the user-supplied list and extract attnums */
106+
ListCell*l;
107+
108+
foreach(l,attnamelist)
109+
{
110+
char*name=strVal(lfirst(l));
111+
intattnum;
112+
inti;
113+
114+
/* Lookup column name */
115+
attnum=InvalidAttrNumber;
116+
for (i=0;i<tupDesc->natts;i++)
117+
{
118+
if (tupDesc->attrs[i]->attisdropped)
119+
continue;
120+
if (namestrcmp(&(tupDesc->attrs[i]->attname),name)==0)
121+
{
122+
attnum=tupDesc->attrs[i]->attnum;
123+
break;
124+
}
125+
}
126+
if (attnum==InvalidAttrNumber)
127+
{
128+
if (rel!=NULL)
129+
ereport(ERROR,
130+
(errcode(ERRCODE_UNDEFINED_COLUMN),
131+
errmsg("column \"%s\" of relation \"%s\" does not exist",
132+
name,RelationGetRelationName(rel))));
133+
else
134+
ereport(ERROR,
135+
(errcode(ERRCODE_UNDEFINED_COLUMN),
136+
errmsg("column \"%s\" does not exist",
137+
name)));
138+
}
139+
/* Check for duplicates */
140+
if (list_member_int(attnums,attnum))
141+
ereport(ERROR,
142+
(errcode(ERRCODE_DUPLICATE_COLUMN),
143+
errmsg("column \"%s\" specified more than once",
144+
name)));
145+
attnums=lappend_int(attnums,attnum);
146+
}
147+
}
148+
149+
returnattnums;
150+
}
151+
152+
/*
153+
* Execute COPY TO/FROM statement for a partitioned table.
154+
* NOTE: based on DoCopy() (see copy.c).
155+
*/
156+
void
157+
PathmanDoCopy(constCopyStmt*stmt,constchar*queryString,uint64*processed)
158+
{
159+
CopyStatecstate;
160+
boolis_from=stmt->is_from;
161+
boolpipe= (stmt->filename==NULL);
162+
Relationrel;
163+
Oidrelid;
164+
Node*query=NULL;
165+
List*range_table=NIL;
166+
167+
/* Disallow COPY TO/FROM file or program except to superusers. */
168+
if (!pipe&& !superuser())
169+
{
170+
if (stmt->is_program)
171+
ereport(ERROR,
172+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
173+
errmsg("must be superuser to COPY to or from an external program"),
174+
errhint("Anyone can COPY to stdout or from stdin. "
175+
"psql's \\copy command also works for anyone.")));
176+
else
177+
ereport(ERROR,
178+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
179+
errmsg("must be superuser to COPY to or from a file"),
180+
errhint("Anyone can COPY to stdout or from stdin. "
181+
"psql's \\copy command also works for anyone.")));
182+
}
183+
184+
if (stmt->relation)
185+
{
186+
TupleDesctupDesc;
187+
AclModerequired_access= (is_from ?ACL_INSERT :ACL_SELECT);
188+
List*attnums;
189+
ListCell*cur;
190+
RangeTblEntry*rte;
191+
192+
Assert(!stmt->query);
193+
194+
/* Open the relation (we've locked it in is_pathman_related_copy()) */
195+
rel=heap_openrv(stmt->relation,NoLock);
196+
197+
relid=RelationGetRelid(rel);
198+
199+
rte=makeNode(RangeTblEntry);
200+
rte->rtekind=RTE_RELATION;
201+
rte->relid=RelationGetRelid(rel);
202+
rte->relkind=rel->rd_rel->relkind;
203+
rte->requiredPerms=required_access;
204+
range_table=list_make1(rte);
205+
206+
tupDesc=RelationGetDescr(rel);
207+
attnums=CopyGetAttnums(tupDesc,rel,stmt->attlist);
208+
foreach(cur,attnums)
209+
{
210+
intattno=lfirst_int(cur)-FirstLowInvalidHeapAttributeNumber;
211+
212+
if (is_from)
213+
rte->insertedCols=bms_add_member(rte->insertedCols,attno);
214+
else
215+
rte->selectedCols=bms_add_member(rte->selectedCols,attno);
216+
}
217+
ExecCheckRTPerms(range_table, true);
218+
219+
/*
220+
* We should perform a query instead of low-level heap scan whenever:
221+
*a) table has a RLS policy;
222+
*b) table is partitioned & it's COPY FROM.
223+
*/
224+
if (check_enable_rls(rte->relid,InvalidOid, false)==RLS_ENABLED||
225+
is_from== false)/* rewrite COPY table TO statements */
226+
{
227+
SelectStmt*select;
228+
ColumnRef*cr;
229+
ResTarget*target;
230+
RangeVar*from;
231+
232+
if (is_from)
233+
ereport(ERROR,
234+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
235+
errmsg("COPY FROM not supported with row-level security"),
236+
errhint("Use INSERT statements instead.")));
237+
238+
/* Build target list */
239+
cr=makeNode(ColumnRef);
240+
241+
if (!stmt->attlist)
242+
cr->fields=list_make1(makeNode(A_Star));
243+
else
244+
cr->fields=stmt->attlist;
245+
246+
cr->location=1;
247+
248+
target=makeNode(ResTarget);
249+
target->name=NULL;
250+
target->indirection=NIL;
251+
target->val= (Node*)cr;
252+
target->location=1;
253+
254+
/*
255+
* Build RangeVar for from clause, fully qualified based on the
256+
* relation which we have opened and locked.
257+
*/
258+
from=makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
259+
RelationGetRelationName(rel),-1);
260+
261+
/* Build query */
262+
select=makeNode(SelectStmt);
263+
select->targetList=list_make1(target);
264+
select->fromClause=list_make1(from);
265+
266+
query= (Node*)select;
267+
268+
/*
269+
* Close the relation for now, but keep the lock on it to prevent
270+
* changes between now and when we start the query-based COPY.
271+
*
272+
* We'll reopen it later as part of the query-based COPY.
273+
*/
274+
heap_close(rel,NoLock);
275+
rel=NULL;
276+
}
277+
}
278+
else
279+
{
280+
Assert(stmt->query);
281+
282+
query=stmt->query;
283+
relid=InvalidOid;
284+
rel=NULL;
285+
}
286+
287+
/* COPY ... FROM ... */
288+
if (is_from)
289+
{
290+
/* There should be relation */
291+
Assert(rel);
292+
293+
/* check read-only transaction and parallel mode */
294+
if (XactReadOnly&& !rel->rd_islocaltemp)
295+
PreventCommandIfReadOnly("PATHMAN COPY FROM");
296+
PreventCommandIfParallelMode("PATHMAN COPY FROM");
297+
298+
cstate=BeginCopyFrom(rel,stmt->filename,stmt->is_program,
299+
stmt->attlist,stmt->options);
300+
/* TODO: copy files to DB */
301+
heap_close(rel,NoLock);
302+
*processed=0;
303+
EndCopyFrom(cstate);
304+
}
305+
/* COPY ... TO ... */
306+
else
307+
{
308+
CopyStmtmodified_copy_stmt;
309+
310+
/* We should've created a query */
311+
Assert(query);
312+
313+
/* Copy 'stmt' and override some of the fields */
314+
modified_copy_stmt=*stmt;
315+
modified_copy_stmt.relation=NULL;
316+
modified_copy_stmt.query=query;
317+
318+
/* Call standard DoCopy using a new CopyStmt */
319+
DoCopy(&modified_copy_stmt,queryString,processed);
320+
}
321+
}

‎src/copy_stmt_hooking.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
/* ------------------------------------------------------------------------
2+
*
3+
* copy_stmt_hooking.h
4+
*Transaction-specific locks and other functions
5+
*
6+
* Copyright (c) 2016, Postgres Professional
7+
*
8+
* ------------------------------------------------------------------------
9+
*/
10+
111
#ifndefCOPY_STMT_HOOKING_H
212
#defineCOPY_STMT_HOOKING_H
313

@@ -8,5 +18,6 @@
818

919

1020
boolis_pathman_related_copy(Node*parsetree);
21+
voidPathmanDoCopy(constCopyStmt*stmt,constchar*queryString,uint64*processed);
1122

1223
#endif

‎src/hooks.c

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,17 +387,22 @@ pg_pathman_enable_assign_hook(bool newval, void *extra)
387387

388388
/* Return quickly if nothing has changed */
389389
if (newval== (pg_pathman_init_state.pg_pathman_enable&&
390+
pg_pathman_init_state.auto_partition&&
391+
pg_pathman_init_state.override_copy&&
390392
pg_pathman_enable_runtimeappend&&
391393
pg_pathman_enable_runtime_merge_append&&
392394
pg_pathman_enable_partition_filter))
393395
return;
394396

397+
pg_pathman_init_state.auto_partition=newval;
398+
pg_pathman_init_state.override_copy=newval;
395399
pg_pathman_enable_runtime_merge_append=newval;
396400
pg_pathman_enable_runtimeappend=newval;
397401
pg_pathman_enable_partition_filter=newval;
398402

399403
elog(NOTICE,
400-
"RuntimeAppend, RuntimeMergeAppend and PartitionFilter nodes have been %s",
404+
"RuntimeAppend, RuntimeMergeAppend and PartitionFilter nodes "
405+
"and some other options have been %s",
401406
newval ?"enabled" :"disabled");
402407
}
403408

@@ -594,10 +599,17 @@ pathman_process_utility_hook(Node *parsetree,
594599
context,params,
595600
dest,completionTag);
596601

597-
/* Override standard COPYstatements if needed */
602+
/* Override standard COPYstatement if needed */
598603
if (is_pathman_related_copy(parsetree))
599604
{
600-
elog(INFO,"copy!");
605+
uint64processed;
606+
607+
PathmanDoCopy((CopyStmt*)parsetree,queryString,&processed);
608+
if (completionTag)
609+
snprintf(completionTag,COMPLETION_TAG_BUFSIZE,
610+
"PATHMAN COPY "UINT64_FORMAT,processed);
611+
612+
return;/* don't call standard_ProcessUtility() */
601613
}
602614

603615
/* Call internal implementation */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp