Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4d154b4

Browse files
committed
WIP working prototype of COPY FROM for partitioned tables (PathmanCopyFrom), refactoring
1 parent7c15a2a commit4d154b4

File tree

3 files changed

+366
-45
lines changed

3 files changed

+366
-45
lines changed

‎src/copy_stmt_hooking.c

Lines changed: 246 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include"copy_stmt_hooking.h"
1414
#include"init.h"
15+
#include"partition_filter.h"
1516
#include"relation_info.h"
1617

1718
#include"access/htup_details.h"
@@ -26,12 +27,22 @@
2627
#include"nodes/makefuncs.h"
2728
#include"utils/builtins.h"
2829
#include"utils/lsyscache.h"
30+
#include"utils/memutils.h"
2931
#include"utils/rel.h"
3032
#include"utils/rls.h"
3133

3234
#include"libpq/libpq.h"
3335

3436

37+
staticuint64PathmanCopyFrom(CopyStatecstate,
38+
Relationparent_rel,
39+
List*range_table,
40+
boolold_protocol);
41+
staticResultRelInfoHolder*select_partition_for_copy(constPartRelationInfo*prel,
42+
ResultPartsStorage*parts_storage,
43+
Datumvalue,EState*estate);
44+
45+
3546
/*
3647
* Is pg_pathman supposed to handle this COPY stmt?
3748
*/
@@ -283,6 +294,11 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
283294
/* COPY ... FROM ... */
284295
if (is_from)
285296
{
297+
boolis_old_protocol;
298+
299+
is_old_protocol=PG_PROTOCOL_MAJOR(FrontendProtocol)<3&&
300+
stmt->filename==NULL;
301+
286302
/* There should be relation */
287303
Assert(rel);
288304

@@ -293,9 +309,7 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
293309

294310
cstate=BeginCopyFrom(rel,stmt->filename,stmt->is_program,
295311
stmt->attlist,stmt->options);
296-
/* TODO: copy files to DB */
297-
heap_close(rel,NoLock);
298-
*processed=0;
312+
*processed=PathmanCopyFrom(cstate,rel,range_table,is_old_protocol);
299313
EndCopyFrom(cstate);
300314
}
301315
/* COPY ... TO ... */
@@ -314,4 +328,233 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
314328
/* Call standard DoCopy using a new CopyStmt */
315329
DoCopy(&modified_copy_stmt,queryString,processed);
316330
}
331+
332+
/*
333+
* Close the relation. If reading, we can release the AccessShareLock we
334+
* got; if writing, we should hold the lock until end of transaction to
335+
* ensure that updates will be committed before lock is released.
336+
*/
337+
if (rel!=NULL)
338+
heap_close(rel, (is_from ?NoLock :AccessShareLock));
339+
}
340+
341+
/*
342+
* Copy FROM file to relation.
343+
*/
344+
staticuint64
345+
PathmanCopyFrom(CopyStatecstate,Relationparent_rel,
346+
List*range_table,boolold_protocol)
347+
{
348+
HeapTupletuple;
349+
TupleDesctupDesc;
350+
Datum*values;
351+
bool*nulls;
352+
353+
ResultPartsStorageparts_storage;
354+
ResultRelInfo*parent_result_rel;
355+
356+
EState*estate=CreateExecutorState();/* for ExecConstraints() */
357+
ExprContext*econtext;
358+
TupleTableSlot*myslot;
359+
MemoryContextoldcontext=CurrentMemoryContext;
360+
361+
uint64processed=0;
362+
363+
364+
tupDesc=RelationGetDescr(parent_rel);
365+
366+
parent_result_rel=makeNode(ResultRelInfo);
367+
InitResultRelInfo(parent_result_rel,
368+
parent_rel,
369+
1,/* dummy rangetable index */
370+
0);
371+
ExecOpenIndices(parent_result_rel, false);
372+
373+
estate->es_result_relations=parent_result_rel;
374+
estate->es_num_result_relations=1;
375+
estate->es_result_relation_info=parent_result_rel;
376+
estate->es_range_table=range_table;
377+
378+
/* Initialize ResultPartsStorage */
379+
init_result_parts_storage(&parts_storage,estate, false,
380+
ResultPartsStorageStandard,
381+
check_acl_for_partition,NULL);
382+
parts_storage.saved_rel_info=parent_result_rel;
383+
384+
/* Set up a tuple slot too */
385+
myslot=ExecInitExtraTupleSlot(estate);
386+
ExecSetSlotDescriptor(myslot,tupDesc);
387+
/* Triggers might need a slot as well */
388+
estate->es_trig_tuple_slot=ExecInitExtraTupleSlot(estate);
389+
390+
/* Prepare to catch AFTER triggers. */
391+
AfterTriggerBeginQuery();
392+
393+
/*
394+
* Check BEFORE STATEMENT insertion triggers. It's debatable whether we
395+
* should do this for COPY, since it's not really an "INSERT" statement as
396+
* such. However, executing these triggers maintains consistency with the
397+
* EACH ROW triggers that we already fire on COPY.
398+
*/
399+
ExecBSInsertTriggers(estate,parent_result_rel);
400+
401+
values= (Datum*)palloc(tupDesc->natts*sizeof(Datum));
402+
nulls= (bool*)palloc(tupDesc->natts*sizeof(bool));
403+
404+
econtext=GetPerTupleExprContext(estate);
405+
406+
for (;;)
407+
{
408+
TupleTableSlot*slot;
409+
boolskip_tuple;
410+
Oidtuple_oid=InvalidOid;
411+
412+
constPartRelationInfo*prel;
413+
ResultRelInfoHolder*rri_holder_child;
414+
ResultRelInfo*child_result_rel;
415+
416+
CHECK_FOR_INTERRUPTS();
417+
418+
ResetPerTupleExprContext(estate);
419+
420+
/* Fetch PartRelationInfo for parent relation */
421+
prel=get_pathman_relation_info(RelationGetRelid(parent_rel));
422+
423+
/* Switch into its memory context */
424+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
425+
426+
if (!NextCopyFrom(cstate,econtext,values,nulls,&tuple_oid))
427+
break;
428+
429+
/* Search for a matching partition */
430+
rri_holder_child=select_partition_for_copy(prel,&parts_storage,
431+
values[prel->attnum-1],
432+
estate);
433+
child_result_rel=rri_holder_child->result_rel_info;
434+
estate->es_result_relation_info=child_result_rel;
435+
436+
/* And now we can form the input tuple. */
437+
tuple=heap_form_tuple(tupDesc,values,nulls);
438+
if (tuple_oid!=InvalidOid)
439+
HeapTupleSetOid(tuple,tuple_oid);
440+
441+
/*
442+
* Constraints might reference the tableoid column, so initialize
443+
* t_tableOid before evaluating them.
444+
*/
445+
tuple->t_tableOid=RelationGetRelid(child_result_rel->ri_RelationDesc);
446+
447+
/* Triggers and stuff need to be invoked in query context. */
448+
MemoryContextSwitchTo(oldcontext);
449+
450+
/* Place tuple in tuple slot --- but slot shouldn't free it */
451+
slot=myslot;
452+
ExecStoreTuple(tuple,slot,InvalidBuffer, false);
453+
454+
skip_tuple= false;
455+
456+
/* BEFORE ROW INSERT Triggers */
457+
if (child_result_rel->ri_TrigDesc&&
458+
child_result_rel->ri_TrigDesc->trig_insert_before_row)
459+
{
460+
slot=ExecBRInsertTriggers(estate,child_result_rel,slot);
461+
462+
if (slot==NULL)/* "do nothing" */
463+
skip_tuple= true;
464+
else/* trigger might have changed tuple */
465+
tuple=ExecMaterializeSlot(slot);
466+
}
467+
468+
/* Proceed if we still have a tuple */
469+
if (!skip_tuple)
470+
{
471+
List*recheckIndexes=NIL;
472+
473+
/* Check the constraints of the tuple */
474+
if (child_result_rel->ri_RelationDesc->rd_att->constr)
475+
ExecConstraints(child_result_rel,slot,estate);
476+
477+
/* OK, store the tuple and create index entries for it */
478+
simple_heap_insert(child_result_rel->ri_RelationDesc,tuple);
479+
480+
if (child_result_rel->ri_NumIndices>0)
481+
recheckIndexes=ExecInsertIndexTuples(slot,&(tuple->t_self),
482+
estate, false,NULL,
483+
NIL);
484+
485+
/* AFTER ROW INSERT Triggers */
486+
ExecARInsertTriggers(estate,child_result_rel,tuple,
487+
recheckIndexes);
488+
489+
list_free(recheckIndexes);
490+
491+
/*
492+
* We count only tuples not suppressed by a BEFORE INSERT trigger;
493+
* this is the same definition used by execMain.c for counting
494+
* tuples inserted by an INSERT command.
495+
*/
496+
processed++;
497+
}
498+
}
499+
500+
MemoryContextSwitchTo(oldcontext);
501+
502+
/*
503+
* In the old protocol, tell pqcomm that we can process normal protocol
504+
* messages again.
505+
*/
506+
if (old_protocol)
507+
pq_endmsgread();
508+
509+
/* Execute AFTER STATEMENT insertion triggers */
510+
ExecASInsertTriggers(estate,parent_result_rel);
511+
512+
/* Handle queued AFTER triggers */
513+
AfterTriggerEndQuery(estate);
514+
515+
pfree(values);
516+
pfree(nulls);
517+
518+
ExecResetTupleTable(estate->es_tupleTable, false);
519+
fini_result_parts_storage(&parts_storage);
520+
521+
FreeExecutorState(estate);
522+
523+
returnprocessed;
524+
}
525+
526+
/*
527+
* Smart wrapper for scan_result_parts_storage().
528+
*/
529+
staticResultRelInfoHolder*
530+
select_partition_for_copy(constPartRelationInfo*prel,
531+
ResultPartsStorage*parts_storage,
532+
Datumvalue,EState*estate)
533+
{
534+
ExprContext*econtext;
535+
ResultRelInfoHolder*rri_holder;
536+
Oidselected_partid=InvalidOid;
537+
Oid*parts;
538+
intnparts;
539+
540+
econtext=GetPerTupleExprContext(estate);
541+
542+
/* Search for matching partitions using partitioned column */
543+
parts=find_partitions_for_value(value,prel,econtext,&nparts);
544+
545+
if (nparts>1)
546+
elog(ERROR,"PATHMAN COPY selected more than one partition");
547+
elseif (nparts==0)
548+
elog(ERROR,
549+
"There is no suitable partition for key '%s'",
550+
datum_to_cstring(value,prel->atttype));
551+
else
552+
selected_partid=parts[0];
553+
554+
/* Replace parent table with a suitable partition */
555+
MemoryContextSwitchTo(estate->es_query_cxt);
556+
rri_holder=scan_result_parts_storage(selected_partid,parts_storage);
557+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
558+
559+
returnrri_holder;
317560
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp