44
55#include "catalog/pg_class.h"
66#include "catalog/pg_constraint.h"
7+ #include "catalog/pg_operator.h"
78#include "utils/syscache.h"
89#include "access/htup_details.h"
910#include "utils/builtins.h"
@@ -17,6 +18,7 @@ boolinitialization_needed = true;
1718
1819
1920static bool validate_range_constraint (Expr * ,PartRelationInfo * ,Datum * ,Datum * );
21+ static bool validate_hash_constraint (Expr * expr ,PartRelationInfo * prel ,int * hash );
2022static int cmp_range_entries (const void * p1 ,const void * p2 );
2123
2224
@@ -124,7 +126,8 @@ load_part_relations_hashtable(bool reinitialize)
124126free_dsm_array (& prel -> children );
125127prel -> children_count = 0 ;
126128}
127- load_hash_restrictions (oid );
129+ load_check_constraints (oid );
130+ // load_hash_restrictions(oid);
128131break ;
129132}
130133}
@@ -281,16 +284,20 @@ load_check_constraints(Oid parent_oid)
281284RangeEntry * ranges ;
282285Datum min ;
283286Datum max ;
284-
285- rangerel = (RangeRelation * )
286- hash_search (range_restrictions , (void * )& parent_oid ,HASH_ENTER ,& found );
287- // rangerel->nranges = 0;
287+ int hash ;
288+ HashRelation * hashrel ;
288289
289290alloc_dsm_array (& prel -> children ,sizeof (Oid ),proc );
290291children = (Oid * )dsm_array_get_pointer (& prel -> children );
291292
292- alloc_dsm_array (& rangerel -> ranges ,sizeof (RangeEntry ),proc );
293- ranges = (RangeEntry * )dsm_array_get_pointer (& rangerel -> ranges );
293+ if (prel -> parttype == PT_RANGE )
294+ {
295+ rangerel = (RangeRelation * )
296+ hash_search (range_restrictions , (void * )& parent_oid ,HASH_ENTER ,& found );
297+
298+ alloc_dsm_array (& rangerel -> ranges ,sizeof (RangeEntry ),proc );
299+ ranges = (RangeEntry * )dsm_array_get_pointer (& rangerel -> ranges );
300+ }
294301
295302for (i = 0 ;i < proc ;i ++ )
296303 {
@@ -315,24 +322,42 @@ load_check_constraints(Oid parent_oid)
315322conbin = TextDatumGetCString (val );
316323expr = (Expr * )stringToNode (conbin );
317324
318- if (prel -> parttype == PT_RANGE )
319- validate_range_constraint (expr ,prel ,& min ,& max );
325+ switch (prel -> parttype )
326+ {
327+ case PT_RANGE :
328+ if (!validate_range_constraint (expr ,prel ,& min ,& max ))
329+ /* TODO: elog() */
330+ continue ;
320331
321- // re.child_oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 2, &arg1_isnull));
322- re .child_oid = con -> conrelid ;
323- re .min = min ;
324- re .max = max ;
332+ re .child_oid = con -> conrelid ;
333+ re .min = min ;
334+ re .max = max ;
325335
326- ranges [i ]= re ;
327- // children[prel->children_count++] = re.child_oid;
336+ ranges [i ]= re ;
337+ break ;
338+
339+ case PT_HASH :
340+ if (!validate_hash_constraint (expr ,prel ,& hash ))
341+ /* TODO: elog() */
342+ continue ;
343+
344+ hashrel = (HashRelation * )
345+ hash_search (hash_restrictions , (void * )& hash ,HASH_ENTER ,& found );
346+ hashrel -> child_oid = con -> conrelid ;
347+ children [hash ]= con -> conrelid ;
348+ }
328349}
350+ prel -> children_count = proc ;
329351
330- /* sort ascending */
331- qsort (ranges ,proc ,sizeof (RangeEntry ),cmp_range_entries );
352+ if (prel -> parttype == PT_RANGE )
353+ {
354+ /* sort ascending */
355+ qsort (ranges ,proc ,sizeof (RangeEntry ),cmp_range_entries );
332356
333- /* copy oids to prel */
334- for (i = 0 ;i < proc ;i ++ ,prel -> children_count ++ )
335- children [i ]= ranges [i ].child_oid ;
357+ /* copy oids to prel */
358+ for (i = 0 ;i < proc ;i ++ )
359+ children [i ]= ranges [i ].child_oid ;
360+ }
336361
337362/* TODO: check if some ranges overlap! */
338363 }
@@ -402,6 +427,55 @@ validate_range_constraint(Expr *expr, PartRelationInfo *prel, Datum *min, Datum
402427* max = ((Const * )right )-> constvalue ;
403428}
404429
430+ return true;
431+ }
432+
433+ /*
434+ * Validate hash constraint. It should look like "Var % Const = Const"
435+ */
436+ static bool
437+ validate_hash_constraint (Expr * expr ,PartRelationInfo * prel ,int * hash )
438+ {
439+ OpExpr * eqexpr ;
440+ OpExpr * modexpr ;
441+
442+ if (!IsA (expr ,OpExpr ))
443+ return false;
444+ eqexpr = (OpExpr * )expr ;
445+
446+ /* is this an equality operator? */
447+ if (eqexpr -> opno != Int4EqualOperator )
448+ return false;
449+
450+ if (!IsA (linitial (eqexpr -> args ),OpExpr ))
451+ return false;
452+
453+ /* is this a modulus operator? */
454+ modexpr = (OpExpr * )linitial (eqexpr -> args );
455+ if (modexpr -> opno != 530 )
456+ return false;
457+
458+ if (list_length (modexpr -> args )== 2 )
459+ {
460+ Node * left = linitial (modexpr -> args );
461+ Node * right = lsecond (modexpr -> args );
462+ Const * mod_result ;
463+
464+ if ( !IsA (left ,Var )|| !IsA (right ,Const ) )
465+ return false;
466+ if ( ((Var * )left )-> varattno != prel -> attnum )
467+ return false;
468+ if (DatumGetInt32 (((Const * )right )-> constvalue )!= prel -> children .length )
469+ return false;
470+
471+ if ( !IsA (lsecond (eqexpr -> args ),Const ) )
472+ return false;
473+
474+ mod_result = lsecond (eqexpr -> args );
475+ * hash = DatumGetInt32 (mod_result -> constvalue );
476+ return true;
477+ }
478+
405479return false;
406480}
407481