22#include "executor/spi.h"
33#include "catalog/pg_type.h"
44
5+ #include "catalog/pg_class.h"
6+ #include "catalog/pg_constraint.h"
7+ #include "utils/syscache.h"
8+ #include "access/htup_details.h"
9+ #include "utils/builtins.h"
10+ #include "utils/typcache.h"
11+
12+
513HTAB * relations = NULL ;
614HTAB * hash_restrictions = NULL ;
715HTAB * range_restrictions = NULL ;
816bool initialization_needed = true;
917
18+
19+ static bool validate_range_constraint (Expr * ,PartRelationInfo * ,Datum * ,Datum * );
20+ static int cmp_range_entries (const void * p1 ,const void * p2 );
21+
22+
1023/*
1124 * Initialize hashtables
1225 */
@@ -63,8 +76,8 @@ load_part_relations_hashtable()
6376for (i = 0 ;i < proc ;i ++ )
6477{
6578HeapTuple tuple = tuptable -> vals [i ];
66-
6779int oid = DatumGetObjectId (SPI_getbinval (tuple ,tupdesc ,1 ,& isnull ));
80+
6881prinfo = (PartRelationInfo * )
6982hash_search (relations , (const void * )& oid ,HASH_ENTER ,NULL );
7083prinfo -> oid = oid ;
@@ -87,10 +100,13 @@ load_part_relations_hashtable()
87100prinfo = (PartRelationInfo * )
88101hash_search (relations , (const void * )& oid ,HASH_FIND ,NULL );
89102
103+ // load_check_constraints(oid);
104+
90105switch (prinfo -> parttype )
91106{
92107case PT_RANGE :
93- load_range_restrictions (oid );
108+ // load_range_restrictions(oid);
109+ load_check_constraints (oid );
94110break ;
95111case PT_HASH :
96112load_hash_restrictions (oid );
@@ -203,14 +219,16 @@ create_hash_restrictions_hashtable()
203219& ctl ,HASH_ELEM |HASH_BLOBS );
204220}
205221
222+ /*
223+ * Load and validate constraints
224+ * TODO: make it work for HASH partitioning
225+ */
206226void
207- load_range_restrictions (Oid parent_oid )
227+ load_check_constraints (Oid parent_oid )
208228{
209229bool found ;
210230PartRelationInfo * prel ;
211231RangeRelation * rangerel ;
212- // HashRelation *hashrel;
213- // HashRelationKey key;
214232int ret ;
215233int i ;
216234int proc ;
@@ -224,24 +242,15 @@ load_range_restrictions(Oid parent_oid)
224242prel = (PartRelationInfo * )
225243hash_search (relations , (const void * )& parent_oid ,HASH_FIND ,& found );
226244
227- /* if already loaded then quit */
228- if (prel -> children_count > 0 )
229- return ;
245+ // / * if already loaded then quit */
246+ // if (prel->children_count > 0)
247+ // return;
230248
231249// SPI_connect();
232- ret = SPI_execute_with_args ("SELECT p.relfilenode, c.relfilenode, "
233- "rr.min_num, rr.max_num, "
234- "rr.min_dt, "
235- "rr.max_dt, "
236- "rr.min_dt::DATE, "
237- "rr.max_dt::DATE, "
238- "rr.min_num::INTEGER, "
239- "rr.max_num::INTEGER "
240- "FROM pg_pathman_range_rels rr "
241- "JOIN pg_class p ON p.relname = rr.parent "
242- "JOIN pg_class c ON c.relname = rr.child "
243- "WHERE p.relfilenode = $1 "
244- "ORDER BY rr.parent, rr.min_num, rr.min_dt" ,
250+ ret = SPI_execute_with_args ("select pg_constraint.* "
251+ "from pg_constraint "
252+ "join pg_inherits on inhrelid = conrelid "
253+ "where inhparent = $1 and contype='c';" ,
2452541 ,oids ,vals ,nulls , true,0 );
246255proc = SPI_processed ;
247256
@@ -251,6 +260,8 @@ load_range_restrictions(Oid parent_oid)
251260SPITupleTable * tuptable = SPI_tuptable ;
252261Oid * children ;
253262RangeEntry * ranges ;
263+ Datum min ;
264+ Datum max ;
254265
255266rangerel = (RangeRelation * )
256267hash_search (range_restrictions , (void * )& parent_oid ,HASH_ENTER ,& found );
@@ -264,60 +275,115 @@ load_range_restrictions(Oid parent_oid)
264275
265276for (i = 0 ;i < proc ;i ++ )
266277 {
267- Datum min ;
268- Datum max ;
269- RangeEntry re ;
270- HeapTuple tuple = tuptable -> vals [i ];
271-
272- // int parent_oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 1, &arg1_isnull));
273- re .child_oid = DatumGetObjectId (SPI_getbinval (tuple ,tupdesc ,2 ,& arg1_isnull ));
274-
275- /* date */
276- // switch (prinfo->atttype)
277- // {
278- // case AT_INT:
279- // min = SPI_getbinval(tuple, tupdesc, 3, &isnull);
280- // max = SPI_getbinval(tuple, tupdesc, 4, &isnull);
281- // re.min.integer = DatumGetInt32(min);
282- // re.max.integer = DatumGetInt32(max);
283- // break;
284- // case AT_DATE:
285- // min = SPI_getbinval(tuple, tupdesc, 5, &isnull);
286- // max = SPI_getbinval(tuple, tupdesc, 6, &isnull);
287- // re.min.date = DatumGetDateADT(min);
288- // re.max.date = DatumGetDateADT(max);
289- // break;
290- // }
291-
292- switch (prel -> atttype )
293- {
294- case DATEOID :
295- re .min = SPI_getbinval (tuple ,tupdesc ,7 ,& arg1_isnull );
296- re .max = SPI_getbinval (tuple ,tupdesc ,8 ,& arg2_isnull );
297- break ;
298- case TIMESTAMPOID :
299- re .min = SPI_getbinval (tuple ,tupdesc ,5 ,& arg1_isnull );
300- re .max = SPI_getbinval (tuple ,tupdesc ,6 ,& arg2_isnull );
301- break ;
302- case INT2OID :
303- case INT4OID :
304- case INT8OID :
305- re .min = SPI_getbinval (tuple ,tupdesc ,9 ,& arg1_isnull );
306- re .max = SPI_getbinval (tuple ,tupdesc ,10 ,& arg2_isnull );
307- break ;
308- default :
309- re .min = SPI_getbinval (tuple ,tupdesc ,3 ,& arg1_isnull );
310- re .max = SPI_getbinval (tuple ,tupdesc ,4 ,& arg2_isnull );
311- break ;
312- }
278+ RangeEntry re ;
279+ HeapTuple tuple = tuptable -> vals [i ];
280+ bool isnull ;
281+ Datum val ;
282+ char * conbin ;
283+ Expr * expr ;
284+
285+ // HeapTuplereltuple;
286+ // Form_pg_class pg_class_tuple;
287+ Form_pg_constraint con ;
288+
289+ con = (Form_pg_constraint )GETSTRUCT (tuple );
290+
291+ val = SysCacheGetAttr (CONSTROID ,tuple ,Anum_pg_constraint_conbin ,
292+ & isnull );
293+ if (isnull )
294+ elog (ERROR ,"null conbin for constraint %u" ,
295+ HeapTupleGetOid (tuple ));
296+ conbin = TextDatumGetCString (val );
297+ expr = (Expr * )stringToNode (conbin );
298+
299+ if (prel -> parttype == PT_RANGE )
300+ validate_range_constraint (expr ,prel ,& min ,& max );
301+
302+ // re.child_oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 2, &arg1_isnull));
303+ re .child_oid = con -> conrelid ;
304+ re .min = min ;
305+ re .max = max ;
313306
314307ranges [rangerel -> nranges ++ ]= re ;
315- // prel->children[prel->children_count++] = re.child_oid;
316- children [prel -> children_count ++ ]= re .child_oid ;
317- }
308+ // children[prel->children_count++] = re.child_oid;
309+ }
310+
311+ /* sort ascending */
312+ qsort (ranges ,rangerel -> nranges ,sizeof (RangeEntry ),cmp_range_entries );
313+
314+ /* copy oids to prel */
315+ for (i = 0 ;i < rangerel -> nranges ;i ++ ,prel -> children_count ++ )
316+ children [i ]= ranges [i ].child_oid ;
317+
318+ /* TODO: check if some ranges overlap! */
318319 }
320+ }
319321
320- // SPI_finish();
322+
323+ /* qsort comparison function for oids */
324+ static int
325+ cmp_range_entries (const void * p1 ,const void * p2 )
326+ {
327+ RangeEntry * v1 = (const RangeEntry * )p1 ;
328+ RangeEntry * v2 = (const RangeEntry * )p2 ;
329+
330+ if (v1 -> min < v2 -> min )
331+ return -1 ;
332+ if (v1 -> min > v2 -> min )
333+ return 1 ;
334+ return 0 ;
335+ }
336+
337+
338+ static bool
339+ validate_range_constraint (Expr * expr ,PartRelationInfo * prel ,Datum * min ,Datum * max )
340+ {
341+ TypeCacheEntry * tce ;
342+ int strategy ;
343+ BoolExpr * boolexpr = (BoolExpr * )expr ;
344+ OpExpr * opexpr ;
345+
346+ /* it should be an AND operator on top */
347+ if ( !(IsA (expr ,BoolExpr )&& boolexpr -> boolop == AND_EXPR ) )
348+ return false;
349+
350+ /* and it should have exactly two operands */
351+ if (list_length (boolexpr -> args )!= 2 )
352+ return false;
353+
354+ tce = lookup_type_cache (prel -> atttype ,TYPECACHE_EQ_OPR |TYPECACHE_LT_OPR |TYPECACHE_GT_OPR );
355+ // strategy = get_op_opfamily_strategy(boolexpr->opno, tce->btree_opf);
356+
357+ /* check that left operand is >= operator */
358+ opexpr = (OpExpr * )linitial (boolexpr -> args );
359+ if (get_op_opfamily_strategy (opexpr -> opno ,tce -> btree_opf )== BTGreaterEqualStrategyNumber )
360+ {
361+ Node * left = linitial (opexpr -> args );
362+ Node * right = lsecond (opexpr -> args );
363+ if ( !IsA (left ,Var )|| !IsA (right ,Const ) )
364+ return false;
365+ if ( ((Var * )left )-> varattno != prel -> attnum )
366+ return false;
367+ * min = ((Const * )right )-> constvalue ;
368+ }
369+ else
370+ return false;
371+
372+ /* TODO: rewrite this */
373+ /* check that right operand is < operator */
374+ opexpr = (OpExpr * )lsecond (boolexpr -> args );
375+ if (get_op_opfamily_strategy (opexpr -> opno ,tce -> btree_opf )== BTLessStrategyNumber )
376+ {
377+ Node * left = linitial (opexpr -> args );
378+ Node * right = lsecond (opexpr -> args );
379+ if ( !IsA (left ,Var )|| !IsA (right ,Const ) )
380+ return false;
381+ if ( ((Var * )left )-> varattno != prel -> attnum )
382+ return false;
383+ * max = ((Const * )right )-> constvalue ;
384+ }
385+ else
386+ return false;
321387}
322388
323389/*
@@ -376,4 +442,4 @@ remove_relation_info(Oid relid)
376442}
377443prel -> children_count = 0 ;
378444hash_search (relations , (const void * )& relid ,HASH_REMOVE ,0 );
379- }
445+ }