4
4
5
5
#include "catalog/pg_class.h"
6
6
#include "catalog/pg_constraint.h"
7
+ #include "catalog/pg_operator.h"
7
8
#include "utils/syscache.h"
8
9
#include "access/htup_details.h"
9
10
#include "utils/builtins.h"
@@ -17,6 +18,7 @@ boolinitialization_needed = true;
17
18
18
19
19
20
static bool validate_range_constraint (Expr * ,PartRelationInfo * ,Datum * ,Datum * );
21
+ static bool validate_hash_constraint (Expr * expr ,PartRelationInfo * prel ,int * hash );
20
22
static int cmp_range_entries (const void * p1 ,const void * p2 );
21
23
22
24
@@ -124,7 +126,8 @@ load_part_relations_hashtable(bool reinitialize)
124
126
free_dsm_array (& prel -> children );
125
127
prel -> children_count = 0 ;
126
128
}
127
- load_hash_restrictions (oid );
129
+ load_check_constraints (oid );
130
+ // load_hash_restrictions(oid);
128
131
break ;
129
132
}
130
133
}
@@ -281,16 +284,20 @@ load_check_constraints(Oid parent_oid)
281
284
RangeEntry * ranges ;
282
285
Datum min ;
283
286
Datum max ;
284
-
285
- rangerel = (RangeRelation * )
286
- hash_search (range_restrictions , (void * )& parent_oid ,HASH_ENTER ,& found );
287
- // rangerel->nranges = 0;
287
+ int hash ;
288
+ HashRelation * hashrel ;
288
289
289
290
alloc_dsm_array (& prel -> children ,sizeof (Oid ),proc );
290
291
children = (Oid * )dsm_array_get_pointer (& prel -> children );
291
292
292
- alloc_dsm_array (& rangerel -> ranges ,sizeof (RangeEntry ),proc );
293
- ranges = (RangeEntry * )dsm_array_get_pointer (& rangerel -> ranges );
293
+ if (prel -> parttype == PT_RANGE )
294
+ {
295
+ rangerel = (RangeRelation * )
296
+ hash_search (range_restrictions , (void * )& parent_oid ,HASH_ENTER ,& found );
297
+
298
+ alloc_dsm_array (& rangerel -> ranges ,sizeof (RangeEntry ),proc );
299
+ ranges = (RangeEntry * )dsm_array_get_pointer (& rangerel -> ranges );
300
+ }
294
301
295
302
for (i = 0 ;i < proc ;i ++ )
296
303
{
@@ -315,24 +322,42 @@ load_check_constraints(Oid parent_oid)
315
322
conbin = TextDatumGetCString (val );
316
323
expr = (Expr * )stringToNode (conbin );
317
324
318
- if (prel -> parttype == PT_RANGE )
319
- validate_range_constraint (expr ,prel ,& min ,& max );
325
+ switch (prel -> parttype )
326
+ {
327
+ case PT_RANGE :
328
+ if (!validate_range_constraint (expr ,prel ,& min ,& max ))
329
+ /* TODO: elog() */
330
+ continue ;
320
331
321
- // re.child_oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 2, &arg1_isnull));
322
- re .child_oid = con -> conrelid ;
323
- re .min = min ;
324
- re .max = max ;
332
+ re .child_oid = con -> conrelid ;
333
+ re .min = min ;
334
+ re .max = max ;
325
335
326
- ranges [i ]= re ;
327
- // children[prel->children_count++] = re.child_oid;
336
+ ranges [i ]= re ;
337
+ break ;
338
+
339
+ case PT_HASH :
340
+ if (!validate_hash_constraint (expr ,prel ,& hash ))
341
+ /* TODO: elog() */
342
+ continue ;
343
+
344
+ hashrel = (HashRelation * )
345
+ hash_search (hash_restrictions , (void * )& hash ,HASH_ENTER ,& found );
346
+ hashrel -> child_oid = con -> conrelid ;
347
+ children [hash ]= con -> conrelid ;
348
+ }
328
349
}
350
+ prel -> children_count = proc ;
329
351
330
- /* sort ascending */
331
- qsort (ranges ,proc ,sizeof (RangeEntry ),cmp_range_entries );
352
+ if (prel -> parttype == PT_RANGE )
353
+ {
354
+ /* sort ascending */
355
+ qsort (ranges ,proc ,sizeof (RangeEntry ),cmp_range_entries );
332
356
333
- /* copy oids to prel */
334
- for (i = 0 ;i < proc ;i ++ ,prel -> children_count ++ )
335
- children [i ]= ranges [i ].child_oid ;
357
+ /* copy oids to prel */
358
+ for (i = 0 ;i < proc ;i ++ )
359
+ children [i ]= ranges [i ].child_oid ;
360
+ }
336
361
337
362
/* TODO: check if some ranges overlap! */
338
363
}
@@ -402,6 +427,55 @@ validate_range_constraint(Expr *expr, PartRelationInfo *prel, Datum *min, Datum
402
427
* max = ((Const * )right )-> constvalue ;
403
428
}
404
429
430
+ return true;
431
+ }
432
+
433
+ /*
434
+ * Validate hash constraint. It should look like "Var % Const = Const"
435
+ */
436
+ static bool
437
+ validate_hash_constraint (Expr * expr ,PartRelationInfo * prel ,int * hash )
438
+ {
439
+ OpExpr * eqexpr ;
440
+ OpExpr * modexpr ;
441
+
442
+ if (!IsA (expr ,OpExpr ))
443
+ return false;
444
+ eqexpr = (OpExpr * )expr ;
445
+
446
+ /* is this an equality operator? */
447
+ if (eqexpr -> opno != Int4EqualOperator )
448
+ return false;
449
+
450
+ if (!IsA (linitial (eqexpr -> args ),OpExpr ))
451
+ return false;
452
+
453
+ /* is this a modulus operator? */
454
+ modexpr = (OpExpr * )linitial (eqexpr -> args );
455
+ if (modexpr -> opno != 530 )
456
+ return false;
457
+
458
+ if (list_length (modexpr -> args )== 2 )
459
+ {
460
+ Node * left = linitial (modexpr -> args );
461
+ Node * right = lsecond (modexpr -> args );
462
+ Const * mod_result ;
463
+
464
+ if ( !IsA (left ,Var )|| !IsA (right ,Const ) )
465
+ return false;
466
+ if ( ((Var * )left )-> varattno != prel -> attnum )
467
+ return false;
468
+ if (DatumGetInt32 (((Const * )right )-> constvalue )!= prel -> children .length )
469
+ return false;
470
+
471
+ if ( !IsA (lsecond (eqexpr -> args ),Const ) )
472
+ return false;
473
+
474
+ mod_result = lsecond (eqexpr -> args );
475
+ * hash = DatumGetInt32 (mod_result -> constvalue );
476
+ return true;
477
+ }
478
+
405
479
return false;
406
480
}
407
481