8
8
* ------------------------------------------------------------------------
9
9
*/
10
10
11
- #include "partition_filter .h"
11
+ #include "init .h"
12
12
#include "nodes_common.h"
13
+ #include "partition_filter.h"
13
14
#include "utils.h"
14
- #include "init.h"
15
15
16
+ #include "foreign/fdwapi.h"
17
+ #include "foreign/foreign.h"
18
+ #include "nodes/nodeFuncs.h"
16
19
#include "utils/guc.h"
17
20
#include "utils/memutils.h"
18
- #include "nodes/nodeFuncs.h"
19
21
#include "utils/lsyscache.h"
20
22
#include "utils/syscache.h"
21
23
@@ -35,8 +37,26 @@ typedef struct
35
37
bool estate_not_modified ;/* did we modify EState somehow? */
36
38
}estate_mod_data ;
37
39
40
+ /*
41
+ * Allow INSERTs into any FDW \ postgres_fdw \ no FDWs at all.
42
+ */
43
+ typedef enum
44
+ {
45
+ PF_FDW_INSERT_DISABLED = 0 ,/* INSERTs into FDWs are prohibited */
46
+ PF_FDW_INSERT_POSTGRES ,/* INSERTs into postgres_fdw are OK */
47
+ PF_FDW_INSERT_ANY_FDW /* INSERTs into any FDWs are OK */
48
+ }PF_insert_fdw_mode ;
49
+
50
+ static const struct config_enum_entry pg_pathman_insert_into_fdw_options []= {
51
+ {"disabled" ,PF_FDW_INSERT_DISABLED ,false },
52
+ {"postgres" ,PF_FDW_INSERT_POSTGRES ,false },
53
+ {"any_fdw" ,PF_FDW_INSERT_ANY_FDW ,false },
54
+ {NULL ,0 ,false }
55
+ };
56
+
38
57
39
58
bool pg_pathman_enable_partition_filter = true;
59
+ int pg_pathman_insert_into_fdw = PF_FDW_INSERT_POSTGRES ;
40
60
41
61
CustomScanMethods partition_filter_plan_methods ;
42
62
CustomExecMethods partition_filter_exec_methods ;
@@ -47,6 +67,9 @@ static void partition_filter_visitor(Plan *plan, void *context);
47
67
static List * pfilter_build_tlist (List * tlist );
48
68
static Index append_rte_to_estate (EState * estate ,RangeTblEntry * rte );
49
69
static int append_rri_to_estate (EState * estate ,ResultRelInfo * rri );
70
+ static void prepare_rri_fdw_for_insert (EState * estate ,
71
+ ResultRelInfoHolder * rri_holder ,
72
+ void * arg );
50
73
51
74
52
75
void
@@ -74,6 +97,18 @@ init_partition_filter_static_data(void)
74
97
NULL ,
75
98
NULL ,
76
99
NULL );
100
+
101
+ DefineCustomEnumVariable ("pg_pathman.insert_into_fdw" ,
102
+ "Allow INSERTS into FDW partitions." ,
103
+ NULL ,
104
+ & pg_pathman_insert_into_fdw ,
105
+ PF_FDW_INSERT_POSTGRES ,
106
+ pg_pathman_insert_into_fdw_options ,
107
+ PGC_SUSET ,
108
+ 0 ,
109
+ NULL ,
110
+ NULL ,
111
+ NULL );
77
112
}
78
113
79
114
@@ -179,6 +214,7 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
179
214
Index child_rte_idx ;
180
215
ResultRelInfo * part_result_rel_info ;
181
216
217
+ /* Lock partition and check if it exists */
182
218
LockRelationOid (partid ,parts_storage -> head_open_lock_mode );
183
219
if (!SearchSysCacheExists1 (RELOID ,ObjectIdGetDatum (partid )))
184
220
{
@@ -236,18 +272,18 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
236
272
/* ri_ConstraintExprs will be initialized by ExecRelCheck() */
237
273
part_result_rel_info -> ri_ConstraintExprs = NULL ;
238
274
239
- /*Now fill the ResultRelInfo holder */
275
+ /*Finally fill the ResultRelInfo holder */
240
276
rri_holder -> partid = partid ;
241
277
rri_holder -> result_rel_info = part_result_rel_info ;
242
278
243
- /* Append ResultRelInfo to storage->es_alloc_result_rels */
244
- append_rri_to_estate (parts_storage -> estate ,part_result_rel_info );
245
-
246
279
/* Call on_new_rri_holder_callback() if needed */
247
280
if (parts_storage -> on_new_rri_holder_callback )
248
281
parts_storage -> on_new_rri_holder_callback (parts_storage -> estate ,
249
282
rri_holder ,
250
283
parts_storage -> callback_arg );
284
+
285
+ /* Append ResultRelInfo to storage->es_alloc_result_rels */
286
+ append_rri_to_estate (parts_storage -> estate ,part_result_rel_info );
251
287
}
252
288
253
289
return rri_holder ;
@@ -351,7 +387,7 @@ partition_filter_begin(CustomScanState *node, EState *estate, int eflags)
351
387
/* Init ResultRelInfo cache */
352
388
init_result_parts_storage (& state -> result_parts ,estate ,
353
389
state -> on_conflict_action != ONCONFLICT_NONE ,
354
- ResultPartsStorageStandard ,NULL ,NULL );
390
+ ResultPartsStorageStandard ,prepare_rri_fdw_for_insert ,NULL );
355
391
356
392
state -> warning_triggered = false;
357
393
}
@@ -499,6 +535,148 @@ select_partition_for_insert(const PartRelationInfo *prel,
499
535
return rri_holder ;
500
536
}
501
537
538
+ /*
539
+ * Callback to be executed on FDW partitions.
540
+ */
541
+ static void
542
+ prepare_rri_fdw_for_insert (EState * estate ,
543
+ ResultRelInfoHolder * rri_holder ,
544
+ void * arg )
545
+ {
546
+ ResultRelInfo * rri = rri_holder -> result_rel_info ;
547
+ FdwRoutine * fdw_routine = rri -> ri_FdwRoutine ;
548
+ Oid partid ;
549
+
550
+ /* Nothing to do if not FDW */
551
+ if (fdw_routine == NULL )
552
+ return ;
553
+
554
+ partid = RelationGetRelid (rri -> ri_RelationDesc );
555
+
556
+ /* Perform some checks according to 'pg_pathman_insert_into_fdw' */
557
+ switch (pg_pathman_insert_into_fdw )
558
+ {
559
+ case PF_FDW_INSERT_DISABLED :
560
+ elog (ERROR ,"INSERTs into FDW partitions are disabled" );
561
+ break ;
562
+
563
+ case PF_FDW_INSERT_POSTGRES :
564
+ {
565
+ ForeignDataWrapper * fdw ;
566
+ ForeignServer * fserver ;
567
+
568
+ /* Check if it's PostgreSQL FDW */
569
+ fserver = GetForeignServer (GetForeignTable (partid )-> serverid );
570
+ fdw = GetForeignDataWrapper (fserver -> fdwid );
571
+ if (strcmp ("postgres_fdw" ,fdw -> fdwname )!= 0 )
572
+ elog (ERROR ,"FDWs other than postgres_fdw are restricted" );
573
+ }
574
+ break ;
575
+
576
+ case PF_FDW_INSERT_ANY_FDW :
577
+ {
578
+ ForeignDataWrapper * fdw ;
579
+ ForeignServer * fserver ;
580
+
581
+ fserver = GetForeignServer (GetForeignTable (partid )-> serverid );
582
+ fdw = GetForeignDataWrapper (fserver -> fdwid );
583
+ if (strcmp ("postgres_fdw" ,fdw -> fdwname )!= 0 )
584
+ elog (WARNING ,"unrestricted FDW mode may lead to \"%s\" crashes" ,
585
+ fdw -> fdwname );
586
+ }
587
+ break ;/* do nothing */
588
+
589
+ default :
590
+ elog (ERROR ,"Mode is not implemented yet" );
591
+ break ;
592
+ }
593
+
594
+ if (fdw_routine -> PlanForeignModify )
595
+ {
596
+ RangeTblEntry * rte ;
597
+ ModifyTableState mtstate ;
598
+ List * fdw_private ;
599
+ Query query ;
600
+ PlannedStmt * plan ;
601
+ TupleDesc tupdesc ;
602
+ int i ,
603
+ target_attr ;
604
+
605
+ /* Fetch RangeTblEntry for partition */
606
+ rte = rt_fetch (rri -> ri_RangeTableIndex ,estate -> es_range_table );
607
+
608
+ /* Fetch tuple descriptor */
609
+ tupdesc = RelationGetDescr (rri -> ri_RelationDesc );
610
+
611
+ /* Create fake Query node */
612
+ memset ((void * )& query ,0 ,sizeof (Query ));
613
+ NodeSetTag (& query ,T_Query );
614
+
615
+ query .commandType = CMD_INSERT ;
616
+ query .querySource = QSRC_ORIGINAL ;
617
+ query .resultRelation = 1 ;
618
+ query .rtable = list_make1 (copyObject (rte ));
619
+ query .jointree = makeNode (FromExpr );
620
+
621
+ query .targetList = NIL ;
622
+ query .returningList = NIL ;
623
+
624
+ /* Generate 'query.targetList' using 'tupdesc' */
625
+ target_attr = 1 ;
626
+ for (i = 0 ;i < tupdesc -> natts ;i ++ )
627
+ {
628
+ Form_pg_attribute attr ;
629
+ TargetEntry * te ;
630
+ Param * param ;
631
+
632
+ attr = tupdesc -> attrs [i ];
633
+
634
+ if (attr -> attisdropped )
635
+ continue ;
636
+
637
+ param = makeNode (Param );
638
+ param -> paramkind = PARAM_EXTERN ;
639
+ param -> paramid = target_attr ;
640
+ param -> paramtype = attr -> atttypid ;
641
+ param -> paramtypmod = attr -> atttypmod ;
642
+ param -> paramcollid = attr -> attcollation ;
643
+ param -> location = -1 ;
644
+
645
+ te = makeTargetEntry ((Expr * )param ,target_attr ,
646
+ pstrdup (NameStr (attr -> attname )),
647
+ false);
648
+
649
+ query .targetList = lappend (query .targetList ,te );
650
+
651
+ target_attr ++ ;
652
+ }
653
+
654
+ /* Create fake ModifyTableState */
655
+ memset ((void * )& mtstate ,0 ,sizeof (ModifyTableState ));
656
+ NodeSetTag (& mtstate ,T_ModifyTableState );
657
+ mtstate .ps .state = estate ;
658
+ mtstate .operation = CMD_INSERT ;
659
+ mtstate .resultRelInfo = rri ;
660
+ mtstate .mt_onconflict = ONCONFLICT_NONE ;
661
+
662
+ /* Plan fake query in for FDW access to be planned as well */
663
+ elog (DEBUG1 ,"FDW(%u): plan fake query for fdw_private" ,partid );
664
+ plan = standard_planner (& query ,0 ,NULL );
665
+
666
+ /* Extract fdw_private from useless plan */
667
+ elog (DEBUG1 ,"FDW(%u): extract fdw_private" ,partid );
668
+ fdw_private = (List * )
669
+ linitial (((ModifyTable * )plan -> planTree )-> fdwPrivLists );
670
+
671
+ /* call BeginForeignModify on 'rri' */
672
+ elog (DEBUG1 ,"FDW(%u): call BeginForeignModify on a fake INSERT node" ,partid );
673
+ fdw_routine -> BeginForeignModify (& mtstate ,rri ,fdw_private ,0 ,0 );
674
+
675
+ /* Report success */
676
+ elog (DEBUG1 ,"FDW(%u): success" ,partid );
677
+ }
678
+ }
679
+
502
680
/*
503
681
* Used by fetch_estate_mod_data() to find estate_mod_data.
504
682
*/
@@ -581,7 +759,11 @@ append_rri_to_estate(EState *estate, ResultRelInfo *rri)
581
759
estate -> es_num_result_relations * sizeof (ResultRelInfo ));
582
760
}
583
761
584
- /* Append ResultRelInfo to 'es_result_relations' array */
762
+ /*
763
+ * Append ResultRelInfo to 'es_result_relations' array.
764
+ * NOTE: this is probably safe since ResultRelInfo
765
+ * contains nothing but pointers to various structs.
766
+ */
585
767
estate -> es_result_relations [estate -> es_num_result_relations ]= * rri ;
586
768
587
769
/* Update estate_mod_data */