88 * ------------------------------------------------------------------------
99 */
1010
11- #include "partition_filter .h"
11+ #include "init .h"
1212#include "nodes_common.h"
13+ #include "partition_filter.h"
1314#include "utils.h"
14- #include "init.h"
1515
16+ #include "foreign/fdwapi.h"
17+ #include "foreign/foreign.h"
18+ #include "nodes/nodeFuncs.h"
1619#include "utils/guc.h"
1720#include "utils/memutils.h"
18- #include "nodes/nodeFuncs.h"
1921#include "utils/lsyscache.h"
2022#include "utils/syscache.h"
2123
@@ -35,8 +37,26 @@ typedef struct
3537bool estate_not_modified ;/* did we modify EState somehow? */
3638}estate_mod_data ;
3739
40+ /*
41+ * Allow INSERTs into any FDW \ postgres_fdw \ no FDWs at all.
42+ */
43+ typedef enum
44+ {
45+ PF_FDW_INSERT_DISABLED = 0 ,/* INSERTs into FDWs are prohibited */
46+ PF_FDW_INSERT_POSTGRES ,/* INSERTs into postgres_fdw are OK */
47+ PF_FDW_INSERT_ANY_FDW /* INSERTs into any FDWs are OK */
48+ }PF_insert_fdw_mode ;
49+
50+ static const struct config_enum_entry pg_pathman_insert_into_fdw_options []= {
51+ {"disabled" ,PF_FDW_INSERT_DISABLED ,false },
52+ {"postgres" ,PF_FDW_INSERT_POSTGRES ,false },
53+ {"any_fdw" ,PF_FDW_INSERT_ANY_FDW ,false },
54+ {NULL ,0 ,false }
55+ };
56+
3857
3958bool pg_pathman_enable_partition_filter = true;
59+ int pg_pathman_insert_into_fdw = PF_FDW_INSERT_POSTGRES ;
4060
4161CustomScanMethods partition_filter_plan_methods ;
4262CustomExecMethods partition_filter_exec_methods ;
@@ -47,6 +67,9 @@ static void partition_filter_visitor(Plan *plan, void *context);
4767static List * pfilter_build_tlist (List * tlist );
4868static Index append_rte_to_estate (EState * estate ,RangeTblEntry * rte );
4969static int append_rri_to_estate (EState * estate ,ResultRelInfo * rri );
70+ static void prepare_rri_fdw_for_insert (EState * estate ,
71+ ResultRelInfoHolder * rri_holder ,
72+ void * arg );
5073
5174
5275void
@@ -74,6 +97,18 @@ init_partition_filter_static_data(void)
7497NULL ,
7598NULL ,
7699NULL );
100+
101+ DefineCustomEnumVariable ("pg_pathman.insert_into_fdw" ,
102+ "Allow INSERTS into FDW partitions." ,
103+ NULL ,
104+ & pg_pathman_insert_into_fdw ,
105+ PF_FDW_INSERT_POSTGRES ,
106+ pg_pathman_insert_into_fdw_options ,
107+ PGC_SUSET ,
108+ 0 ,
109+ NULL ,
110+ NULL ,
111+ NULL );
77112}
78113
79114
@@ -179,6 +214,7 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
179214Index child_rte_idx ;
180215ResultRelInfo * part_result_rel_info ;
181216
217+ /* Lock partition and check if it exists */
182218LockRelationOid (partid ,parts_storage -> head_open_lock_mode );
183219if (!SearchSysCacheExists1 (RELOID ,ObjectIdGetDatum (partid )))
184220{
@@ -236,18 +272,18 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
236272/* ri_ConstraintExprs will be initialized by ExecRelCheck() */
237273part_result_rel_info -> ri_ConstraintExprs = NULL ;
238274
239- /*Now fill the ResultRelInfo holder */
275+ /*Finally fill the ResultRelInfo holder */
240276rri_holder -> partid = partid ;
241277rri_holder -> result_rel_info = part_result_rel_info ;
242278
243- /* Append ResultRelInfo to storage->es_alloc_result_rels */
244- append_rri_to_estate (parts_storage -> estate ,part_result_rel_info );
245-
246279/* Call on_new_rri_holder_callback() if needed */
247280if (parts_storage -> on_new_rri_holder_callback )
248281parts_storage -> on_new_rri_holder_callback (parts_storage -> estate ,
249282rri_holder ,
250283parts_storage -> callback_arg );
284+
285+ /* Append ResultRelInfo to storage->es_alloc_result_rels */
286+ append_rri_to_estate (parts_storage -> estate ,part_result_rel_info );
251287}
252288
253289return rri_holder ;
@@ -351,7 +387,7 @@ partition_filter_begin(CustomScanState *node, EState *estate, int eflags)
351387/* Init ResultRelInfo cache */
352388init_result_parts_storage (& state -> result_parts ,estate ,
353389state -> on_conflict_action != ONCONFLICT_NONE ,
354- ResultPartsStorageStandard ,NULL ,NULL );
390+ ResultPartsStorageStandard ,prepare_rri_fdw_for_insert ,NULL );
355391
356392state -> warning_triggered = false;
357393}
@@ -499,6 +535,148 @@ select_partition_for_insert(const PartRelationInfo *prel,
499535return rri_holder ;
500536}
501537
538+ /*
539+ * Callback to be executed on FDW partitions.
540+ */
541+ static void
542+ prepare_rri_fdw_for_insert (EState * estate ,
543+ ResultRelInfoHolder * rri_holder ,
544+ void * arg )
545+ {
546+ ResultRelInfo * rri = rri_holder -> result_rel_info ;
547+ FdwRoutine * fdw_routine = rri -> ri_FdwRoutine ;
548+ Oid partid ;
549+
550+ /* Nothing to do if not FDW */
551+ if (fdw_routine == NULL )
552+ return ;
553+
554+ partid = RelationGetRelid (rri -> ri_RelationDesc );
555+
556+ /* Perform some checks according to 'pg_pathman_insert_into_fdw' */
557+ switch (pg_pathman_insert_into_fdw )
558+ {
559+ case PF_FDW_INSERT_DISABLED :
560+ elog (ERROR ,"INSERTs into FDW partitions are disabled" );
561+ break ;
562+
563+ case PF_FDW_INSERT_POSTGRES :
564+ {
565+ ForeignDataWrapper * fdw ;
566+ ForeignServer * fserver ;
567+
568+ /* Check if it's PostgreSQL FDW */
569+ fserver = GetForeignServer (GetForeignTable (partid )-> serverid );
570+ fdw = GetForeignDataWrapper (fserver -> fdwid );
571+ if (strcmp ("postgres_fdw" ,fdw -> fdwname )!= 0 )
572+ elog (ERROR ,"FDWs other than postgres_fdw are restricted" );
573+ }
574+ break ;
575+
576+ case PF_FDW_INSERT_ANY_FDW :
577+ {
578+ ForeignDataWrapper * fdw ;
579+ ForeignServer * fserver ;
580+
581+ fserver = GetForeignServer (GetForeignTable (partid )-> serverid );
582+ fdw = GetForeignDataWrapper (fserver -> fdwid );
583+ if (strcmp ("postgres_fdw" ,fdw -> fdwname )!= 0 )
584+ elog (WARNING ,"unrestricted FDW mode may lead to \"%s\" crashes" ,
585+ fdw -> fdwname );
586+ }
587+ break ;/* do nothing */
588+
589+ default :
590+ elog (ERROR ,"Mode is not implemented yet" );
591+ break ;
592+ }
593+
594+ if (fdw_routine -> PlanForeignModify )
595+ {
596+ RangeTblEntry * rte ;
597+ ModifyTableState mtstate ;
598+ List * fdw_private ;
599+ Query query ;
600+ PlannedStmt * plan ;
601+ TupleDesc tupdesc ;
602+ int i ,
603+ target_attr ;
604+
605+ /* Fetch RangeTblEntry for partition */
606+ rte = rt_fetch (rri -> ri_RangeTableIndex ,estate -> es_range_table );
607+
608+ /* Fetch tuple descriptor */
609+ tupdesc = RelationGetDescr (rri -> ri_RelationDesc );
610+
611+ /* Create fake Query node */
612+ memset ((void * )& query ,0 ,sizeof (Query ));
613+ NodeSetTag (& query ,T_Query );
614+
615+ query .commandType = CMD_INSERT ;
616+ query .querySource = QSRC_ORIGINAL ;
617+ query .resultRelation = 1 ;
618+ query .rtable = list_make1 (copyObject (rte ));
619+ query .jointree = makeNode (FromExpr );
620+
621+ query .targetList = NIL ;
622+ query .returningList = NIL ;
623+
624+ /* Generate 'query.targetList' using 'tupdesc' */
625+ target_attr = 1 ;
626+ for (i = 0 ;i < tupdesc -> natts ;i ++ )
627+ {
628+ Form_pg_attribute attr ;
629+ TargetEntry * te ;
630+ Param * param ;
631+
632+ attr = tupdesc -> attrs [i ];
633+
634+ if (attr -> attisdropped )
635+ continue ;
636+
637+ param = makeNode (Param );
638+ param -> paramkind = PARAM_EXTERN ;
639+ param -> paramid = target_attr ;
640+ param -> paramtype = attr -> atttypid ;
641+ param -> paramtypmod = attr -> atttypmod ;
642+ param -> paramcollid = attr -> attcollation ;
643+ param -> location = -1 ;
644+
645+ te = makeTargetEntry ((Expr * )param ,target_attr ,
646+ pstrdup (NameStr (attr -> attname )),
647+ false);
648+
649+ query .targetList = lappend (query .targetList ,te );
650+
651+ target_attr ++ ;
652+ }
653+
654+ /* Create fake ModifyTableState */
655+ memset ((void * )& mtstate ,0 ,sizeof (ModifyTableState ));
656+ NodeSetTag (& mtstate ,T_ModifyTableState );
657+ mtstate .ps .state = estate ;
658+ mtstate .operation = CMD_INSERT ;
659+ mtstate .resultRelInfo = rri ;
660+ mtstate .mt_onconflict = ONCONFLICT_NONE ;
661+
662+ /* Plan fake query in for FDW access to be planned as well */
663+ elog (DEBUG1 ,"FDW(%u): plan fake query for fdw_private" ,partid );
664+ plan = standard_planner (& query ,0 ,NULL );
665+
666+ /* Extract fdw_private from useless plan */
667+ elog (DEBUG1 ,"FDW(%u): extract fdw_private" ,partid );
668+ fdw_private = (List * )
669+ linitial (((ModifyTable * )plan -> planTree )-> fdwPrivLists );
670+
671+ /* call BeginForeignModify on 'rri' */
672+ elog (DEBUG1 ,"FDW(%u): call BeginForeignModify on a fake INSERT node" ,partid );
673+ fdw_routine -> BeginForeignModify (& mtstate ,rri ,fdw_private ,0 ,0 );
674+
675+ /* Report success */
676+ elog (DEBUG1 ,"FDW(%u): success" ,partid );
677+ }
678+ }
679+
502680/*
503681 * Used by fetch_estate_mod_data() to find estate_mod_data.
504682 */
@@ -581,7 +759,11 @@ append_rri_to_estate(EState *estate, ResultRelInfo *rri)
581759estate -> es_num_result_relations * sizeof (ResultRelInfo ));
582760}
583761
584- /* Append ResultRelInfo to 'es_result_relations' array */
762+ /*
763+ * Append ResultRelInfo to 'es_result_relations' array.
764+ * NOTE: this is probably safe since ResultRelInfo
765+ * contains nothing but pointers to various structs.
766+ */
585767estate -> es_result_relations [estate -> es_num_result_relations ]= * rri ;
586768
587769/* Update estate_mod_data */