Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit62f1184

Browse files
committed
implement basic FDW support for INSERTs, introduce GUC pg_pathman.insert_into_fdw, restrict FDW INSERTs for COPY FROM stmt, other fixes
1 parentd7fd9b9 commit62f1184

File tree

5 files changed

+222
-16
lines changed

5 files changed

+222
-16
lines changed

‎src/copy_stmt_hooking.c

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include"commands/copy.h"
2424
#include"commands/trigger.h"
2525
#include"executor/executor.h"
26+
#include"foreign/fdwapi.h"
2627
#include"miscadmin.h"
2728
#include"nodes/makefuncs.h"
2829
#include"utils/builtins.h"
@@ -39,6 +40,10 @@ static uint64 PathmanCopyFrom(CopyState cstate,
3940
List*range_table,
4041
boolold_protocol);
4142

43+
staticvoidprepare_rri_fdw_for_copy(EState*estate,
44+
ResultRelInfoHolder*rri_holder,
45+
void*arg);
46+
4247

4348
/*
4449
* Is pg_pathman supposed to handle this COPY stmt?
@@ -63,7 +68,7 @@ is_pathman_related_copy(Node *parsetree)
6368
if (!copy_stmt->relation)
6469
return false;
6570

66-
/*TODO: select appropriate lock for COPY */
71+
/*Get partition's Oid while locking it */
6772
partitioned_table=RangeVarGetRelid(copy_stmt->relation,
6873
(copy_stmt->is_from ?
6974
RowExclusiveLock :
@@ -387,7 +392,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
387392
/* Initialize ResultPartsStorage */
388393
init_result_parts_storage(&parts_storage,estate, false,
389394
ResultPartsStorageStandard,
390-
NULL,NULL);
395+
prepare_rri_fdw_for_copy,NULL);
391396
parts_storage.saved_rel_info=parent_result_rel;
392397

393398
/* Set up a tuple slot too */
@@ -535,3 +540,19 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
535540

536541
returnprocessed;
537542
}
543+
544+
/*
545+
* COPY FROM does not support FDWs, emit ERROR.
546+
*/
547+
staticvoid
548+
prepare_rri_fdw_for_copy(EState*estate,
549+
ResultRelInfoHolder*rri_holder,
550+
void*arg)
551+
{
552+
ResultRelInfo*rri=rri_holder->result_rel_info;
553+
FdwRoutine*fdw_routine=rri->ri_FdwRoutine;
554+
555+
if (fdw_routine!=NULL)
556+
elog(ERROR,"cannot copy to foreign partition \"%s\"",
557+
get_rel_name(RelationGetRelid(rri->ri_RelationDesc)));
558+
}

‎src/partition_filter.c

Lines changed: 191 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@
88
* ------------------------------------------------------------------------
99
*/
1010

11-
#include"partition_filter.h"
11+
#include"init.h"
1212
#include"nodes_common.h"
13+
#include"partition_filter.h"
1314
#include"utils.h"
14-
#include"init.h"
1515

16+
#include"foreign/fdwapi.h"
17+
#include"foreign/foreign.h"
18+
#include"nodes/nodeFuncs.h"
1619
#include"utils/guc.h"
1720
#include"utils/memutils.h"
18-
#include"nodes/nodeFuncs.h"
1921
#include"utils/lsyscache.h"
2022
#include"utils/syscache.h"
2123

@@ -35,8 +37,26 @@ typedef struct
3537
boolestate_not_modified;/* did we modify EState somehow? */
3638
}estate_mod_data;
3739

40+
/*
41+
* Allow INSERTs into any FDW \ postgres_fdw \ no FDWs at all.
42+
*/
43+
typedefenum
44+
{
45+
PF_FDW_INSERT_DISABLED=0,/* INSERTs into FDWs are prohibited */
46+
PF_FDW_INSERT_POSTGRES,/* INSERTs into postgres_fdw are OK */
47+
PF_FDW_INSERT_ANY_FDW/* INSERTs into any FDWs are OK */
48+
}PF_insert_fdw_mode;
49+
50+
staticconststructconfig_enum_entrypg_pathman_insert_into_fdw_options[]= {
51+
{"disabled",PF_FDW_INSERT_DISABLED,false },
52+
{"postgres",PF_FDW_INSERT_POSTGRES,false },
53+
{"any_fdw",PF_FDW_INSERT_ANY_FDW,false },
54+
{NULL,0,false }
55+
};
56+
3857

3958
boolpg_pathman_enable_partition_filter= true;
59+
intpg_pathman_insert_into_fdw=PF_FDW_INSERT_POSTGRES;
4060

4161
CustomScanMethodspartition_filter_plan_methods;
4262
CustomExecMethodspartition_filter_exec_methods;
@@ -47,6 +67,9 @@ static void partition_filter_visitor(Plan *plan, void *context);
4767
staticList*pfilter_build_tlist(List*tlist);
4868
staticIndexappend_rte_to_estate(EState*estate,RangeTblEntry*rte);
4969
staticintappend_rri_to_estate(EState*estate,ResultRelInfo*rri);
70+
staticvoidprepare_rri_fdw_for_insert(EState*estate,
71+
ResultRelInfoHolder*rri_holder,
72+
void*arg);
5073

5174

5275
void
@@ -74,6 +97,18 @@ init_partition_filter_static_data(void)
7497
NULL,
7598
NULL,
7699
NULL);
100+
101+
DefineCustomEnumVariable("pg_pathman.insert_into_fdw",
102+
"Allow INSERTS into FDW partitions.",
103+
NULL,
104+
&pg_pathman_insert_into_fdw,
105+
PF_FDW_INSERT_POSTGRES,
106+
pg_pathman_insert_into_fdw_options,
107+
PGC_SUSET,
108+
0,
109+
NULL,
110+
NULL,
111+
NULL);
77112
}
78113

79114

@@ -179,6 +214,7 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
179214
Indexchild_rte_idx;
180215
ResultRelInfo*part_result_rel_info;
181216

217+
/* Lock partition and check if it exists */
182218
LockRelationOid(partid,parts_storage->head_open_lock_mode);
183219
if(!SearchSysCacheExists1(RELOID,ObjectIdGetDatum(partid)))
184220
{
@@ -236,18 +272,18 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
236272
/* ri_ConstraintExprs will be initialized by ExecRelCheck() */
237273
part_result_rel_info->ri_ConstraintExprs=NULL;
238274

239-
/*Now fill the ResultRelInfo holder */
275+
/*Finally fill the ResultRelInfo holder */
240276
rri_holder->partid=partid;
241277
rri_holder->result_rel_info=part_result_rel_info;
242278

243-
/* Append ResultRelInfo to storage->es_alloc_result_rels */
244-
append_rri_to_estate(parts_storage->estate,part_result_rel_info);
245-
246279
/* Call on_new_rri_holder_callback() if needed */
247280
if (parts_storage->on_new_rri_holder_callback)
248281
parts_storage->on_new_rri_holder_callback(parts_storage->estate,
249282
rri_holder,
250283
parts_storage->callback_arg);
284+
285+
/* Append ResultRelInfo to storage->es_alloc_result_rels */
286+
append_rri_to_estate(parts_storage->estate,part_result_rel_info);
251287
}
252288

253289
returnrri_holder;
@@ -351,7 +387,7 @@ partition_filter_begin(CustomScanState *node, EState *estate, int eflags)
351387
/* Init ResultRelInfo cache */
352388
init_result_parts_storage(&state->result_parts,estate,
353389
state->on_conflict_action!=ONCONFLICT_NONE,
354-
ResultPartsStorageStandard,NULL,NULL);
390+
ResultPartsStorageStandard,prepare_rri_fdw_for_insert,NULL);
355391

356392
state->warning_triggered= false;
357393
}
@@ -499,6 +535,148 @@ select_partition_for_insert(const PartRelationInfo *prel,
499535
returnrri_holder;
500536
}
501537

538+
/*
539+
* Callback to be executed on FDW partitions.
540+
*/
541+
staticvoid
542+
prepare_rri_fdw_for_insert(EState*estate,
543+
ResultRelInfoHolder*rri_holder,
544+
void*arg)
545+
{
546+
ResultRelInfo*rri=rri_holder->result_rel_info;
547+
FdwRoutine*fdw_routine=rri->ri_FdwRoutine;
548+
Oidpartid;
549+
550+
/* Nothing to do if not FDW */
551+
if (fdw_routine==NULL)
552+
return;
553+
554+
partid=RelationGetRelid(rri->ri_RelationDesc);
555+
556+
/* Perform some checks according to 'pg_pathman_insert_into_fdw' */
557+
switch (pg_pathman_insert_into_fdw)
558+
{
559+
casePF_FDW_INSERT_DISABLED:
560+
elog(ERROR,"INSERTs into FDW partitions are disabled");
561+
break;
562+
563+
casePF_FDW_INSERT_POSTGRES:
564+
{
565+
ForeignDataWrapper*fdw;
566+
ForeignServer*fserver;
567+
568+
/* Check if it's PostgreSQL FDW */
569+
fserver=GetForeignServer(GetForeignTable(partid)->serverid);
570+
fdw=GetForeignDataWrapper(fserver->fdwid);
571+
if (strcmp("postgres_fdw",fdw->fdwname)!=0)
572+
elog(ERROR,"FDWs other than postgres_fdw are restricted");
573+
}
574+
break;
575+
576+
casePF_FDW_INSERT_ANY_FDW:
577+
{
578+
ForeignDataWrapper*fdw;
579+
ForeignServer*fserver;
580+
581+
fserver=GetForeignServer(GetForeignTable(partid)->serverid);
582+
fdw=GetForeignDataWrapper(fserver->fdwid);
583+
if (strcmp("postgres_fdw",fdw->fdwname)!=0)
584+
elog(WARNING,"unrestricted FDW mode may lead to \"%s\" crashes",
585+
fdw->fdwname);
586+
}
587+
break;/* do nothing */
588+
589+
default:
590+
elog(ERROR,"Mode is not implemented yet");
591+
break;
592+
}
593+
594+
if (fdw_routine->PlanForeignModify)
595+
{
596+
RangeTblEntry*rte;
597+
ModifyTableStatemtstate;
598+
List*fdw_private;
599+
Queryquery;
600+
PlannedStmt*plan;
601+
TupleDesctupdesc;
602+
inti,
603+
target_attr;
604+
605+
/* Fetch RangeTblEntry for partition */
606+
rte=rt_fetch(rri->ri_RangeTableIndex,estate->es_range_table);
607+
608+
/* Fetch tuple descriptor */
609+
tupdesc=RelationGetDescr(rri->ri_RelationDesc);
610+
611+
/* Create fake Query node */
612+
memset((void*)&query,0,sizeof(Query));
613+
NodeSetTag(&query,T_Query);
614+
615+
query.commandType=CMD_INSERT;
616+
query.querySource=QSRC_ORIGINAL;
617+
query.resultRelation=1;
618+
query.rtable=list_make1(copyObject(rte));
619+
query.jointree=makeNode(FromExpr);
620+
621+
query.targetList=NIL;
622+
query.returningList=NIL;
623+
624+
/* Generate 'query.targetList' using 'tupdesc' */
625+
target_attr=1;
626+
for (i=0;i<tupdesc->natts;i++)
627+
{
628+
Form_pg_attributeattr;
629+
TargetEntry*te;
630+
Param*param;
631+
632+
attr=tupdesc->attrs[i];
633+
634+
if (attr->attisdropped)
635+
continue;
636+
637+
param=makeNode(Param);
638+
param->paramkind=PARAM_EXTERN;
639+
param->paramid=target_attr;
640+
param->paramtype=attr->atttypid;
641+
param->paramtypmod=attr->atttypmod;
642+
param->paramcollid=attr->attcollation;
643+
param->location=-1;
644+
645+
te=makeTargetEntry((Expr*)param,target_attr,
646+
pstrdup(NameStr(attr->attname)),
647+
false);
648+
649+
query.targetList=lappend(query.targetList,te);
650+
651+
target_attr++;
652+
}
653+
654+
/* Create fake ModifyTableState */
655+
memset((void*)&mtstate,0,sizeof(ModifyTableState));
656+
NodeSetTag(&mtstate,T_ModifyTableState);
657+
mtstate.ps.state=estate;
658+
mtstate.operation=CMD_INSERT;
659+
mtstate.resultRelInfo=rri;
660+
mtstate.mt_onconflict=ONCONFLICT_NONE;
661+
662+
/* Plan fake query in for FDW access to be planned as well */
663+
elog(DEBUG1,"FDW(%u): plan fake query for fdw_private",partid);
664+
plan=standard_planner(&query,0,NULL);
665+
666+
/* Extract fdw_private from useless plan */
667+
elog(DEBUG1,"FDW(%u): extract fdw_private",partid);
668+
fdw_private= (List*)
669+
linitial(((ModifyTable*)plan->planTree)->fdwPrivLists);
670+
671+
/* call BeginForeignModify on 'rri' */
672+
elog(DEBUG1,"FDW(%u): call BeginForeignModify on a fake INSERT node",partid);
673+
fdw_routine->BeginForeignModify(&mtstate,rri,fdw_private,0,0);
674+
675+
/* Report success */
676+
elog(DEBUG1,"FDW(%u): success",partid);
677+
}
678+
}
679+
502680
/*
503681
* Used by fetch_estate_mod_data() to find estate_mod_data.
504682
*/
@@ -581,7 +759,11 @@ append_rri_to_estate(EState *estate, ResultRelInfo *rri)
581759
estate->es_num_result_relations*sizeof(ResultRelInfo));
582760
}
583761

584-
/* Append ResultRelInfo to 'es_result_relations' array */
762+
/*
763+
* Append ResultRelInfo to 'es_result_relations' array.
764+
* NOTE: this is probably safe since ResultRelInfo
765+
* contains nothing but pointers to various structs.
766+
*/
585767
estate->es_result_relations[estate->es_num_result_relations]=*rri;
586768

587769
/* Update estate_mod_data */

‎src/partition_filter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ typedef struct
8181

8282

8383
externboolpg_pathman_enable_partition_filter;
84+
externintpg_pathman_insert_into_fdw;
8485

8586
externCustomScanMethodspartition_filter_plan_methods;
8687
externCustomExecMethodspartition_filter_exec_methods;

‎src/pg_pathman.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -904,8 +904,8 @@ create_partitions_internal(Oid relid, Datum value, Oid value_type)
904904
shout_if_prel_is_invalid(relid,prel,PT_RANGE);
905905

906906
/* Read max & min range values from PartRelationInfo */
907-
min_rvalue=prel->ranges[0].min;
908-
max_rvalue=prel->ranges[PrelLastChild(prel)].max;
907+
min_rvalue=PrelGetRangesArray(prel)[0].min;
908+
max_rvalue=PrelGetRangesArray(prel)[PrelLastChild(prel)].max;
909909

910910
/* Retrieve interval as TEXT from tuple */
911911
interval_text=values[Anum_pathman_config_range_interval-1];
@@ -1222,7 +1222,7 @@ handle_binary_opexpr(WalkerContext *context, WrapperNode *result,
12221222
{
12231223
select_range_partitions(c->constvalue,
12241224
&cmp_func,
1225-
context->prel->ranges,
1225+
PrelGetRangesArray(context->prel),
12261226
PrelChildrenCount(context->prel),
12271227
strategy,
12281228
result);
@@ -1383,7 +1383,7 @@ handle_const(const Const *c, WalkerContext *context)
13831383

13841384
select_range_partitions(c->constvalue,
13851385
&tce->cmp_proc_finfo,
1386-
context->prel->ranges,
1386+
PrelGetRangesArray(context->prel),
13871387
PrelChildrenCount(context->prel),
13881388
BTEqualStrategyNumber,
13891389
result);

‎src/utils.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ lock_rows_visitor(Plan *plan, void *context)
154154
}
155155
}
156156

157-
/* NOTE: Used for debug */
157+
/*
158+
* Print Bitmapset as cstring.
159+
*/
158160
#ifdef__GNUC__
159161
__attribute__((unused))
160162
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp