@@ -31,6 +31,11 @@ boolinitialization_needed = true;
31
31
static FmgrInfo * qsort_type_cmp_func ;
32
32
static bool globalByVal ;
33
33
34
+ static bool validate_partition_constraints (Oid * children_oids ,
35
+ uint children_count ,
36
+ Snapshot snapshot ,
37
+ PartRelationInfo * prel ,
38
+ RangeRelation * rangerel );
34
39
static bool validate_range_constraint (Expr * ,PartRelationInfo * ,Datum * ,Datum * );
35
40
static bool validate_hash_constraint (Expr * expr ,PartRelationInfo * prel ,int * hash );
36
41
static bool read_opexpr_const (OpExpr * opexpr ,int varattno ,Datum * val );
@@ -120,7 +125,7 @@ load_config(void)
120
125
121
126
/* Load cache */
122
127
LWLockAcquire (pmstate -> load_config_lock ,LW_EXCLUSIVE );
123
- load_relations_hashtable (new_segment_created );
128
+ load_relations (new_segment_created );
124
129
LWLockRelease (pmstate -> load_config_lock );
125
130
LWLockRelease (pmstate -> dsm_init_lock );
126
131
}
@@ -155,7 +160,7 @@ get_extension_schema()
155
160
* Loads partitioned tables structure to hashtable
156
161
*/
157
162
void
158
- load_relations_hashtable (bool reinitialize )
163
+ load_relations (bool reinitialize )
159
164
{
160
165
int ret ,
161
166
i ,
@@ -233,15 +238,15 @@ load_relations_hashtable(bool reinitialize)
233
238
free_dsm_array (& rangerel -> ranges );
234
239
prel -> children_count = 0 ;
235
240
}
236
- load_check_constraints (oid ,GetCatalogSnapshot (oid ));
241
+ load_partitions (oid ,GetCatalogSnapshot (oid ));
237
242
break ;
238
243
case PT_HASH :
239
244
if (reinitialize && prel -> children .length > 0 )
240
245
{
241
246
free_dsm_array (& prel -> children );
242
247
prel -> children_count = 0 ;
243
248
}
244
- load_check_constraints (oid ,GetCatalogSnapshot (oid ));
249
+ load_partitions (oid ,GetCatalogSnapshot (oid ));
245
250
break ;
246
251
}
247
252
}
@@ -268,18 +273,19 @@ create_relations_hashtable()
268
273
* Load and validate CHECK constraints
269
274
*/
270
275
void
271
- load_check_constraints (Oid parent_oid ,Snapshot snapshot )
276
+ load_partitions (Oid parent_oid ,Snapshot snapshot )
272
277
{
273
278
PartRelationInfo * prel = NULL ;
274
279
RangeRelation * rangerel = NULL ;
275
280
SPIPlanPtr plan ;
276
281
bool found ;
277
282
int ret ,
278
283
i ,
279
- proc ;
284
+ children_count = 0 ;
280
285
Datum vals [1 ];
281
- Oid oids [1 ]= {INT4OID };
286
+ Oid types [1 ]= {INT4OID };
282
287
bool nulls [1 ]= {false};
288
+ Oid * children_oids ;
283
289
284
290
vals [0 ]= Int32GetDatum (parent_oid );
285
291
prel = get_pathman_relation_info (parent_oid ,NULL );
@@ -288,77 +294,144 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
288
294
if (prel -> children .length > 0 )
289
295
return ;
290
296
291
- plan = SPI_prepare ("select pg_constraint.* "
292
- "from pg_constraint "
293
- "join pg_inherits on inhrelid = conrelid "
294
- "where inhparent = $1 and contype='c';" ,
295
- 1 ,oids );
296
- ret = SPI_execute_snapshot (plan ,vals ,nulls ,
297
- snapshot ,InvalidSnapshot , true, false,0 );
298
-
299
- proc = SPI_processed ;
300
-
297
+ /* Load children oids */
298
+ plan = SPI_prepare ("select inhrelid from pg_inherits where inhparent = $1;" ,1 ,types );
299
+ ret = SPI_execute_snapshot (plan ,vals ,nulls ,snapshot ,InvalidSnapshot , true, false,0 );
300
+ children_count = SPI_processed ;
301
301
if (ret > 0 && SPI_tuptable != NULL )
302
302
{
303
- SPITupleTable * tuptable = SPI_tuptable ;
304
- Oid * children ;
305
- RangeEntry * ranges = NULL ;
306
- Datum min ;
307
- Datum max ;
308
- int hash ;
303
+ children_oids = palloc ( sizeof ( Oid ) * children_count ) ;
304
+ for ( i = 0 ; i < children_count ; i ++ )
305
+ {
306
+ TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
307
+ HeapTuple tuple = SPI_tuptable -> vals [ i ] ;
308
+ bool isnull ;
309
309
310
- alloc_dsm_array (& prel -> children ,sizeof (Oid ),proc );
311
- children = (Oid * )dsm_array_get_pointer (& prel -> children );
310
+ children_oids [i ]= \
311
+ DatumGetObjectId (SPI_getbinval (tuple ,tupdesc ,1 ,& isnull ));
312
+ }
313
+ }
312
314
315
+ if (children_count > 0 )
316
+ {
317
+ alloc_dsm_array (& prel -> children ,sizeof (Oid ),children_count );
318
+
319
+ /* allocate ranges array is dsm */
313
320
if (prel -> parttype == PT_RANGE )
314
321
{
315
- TypeCacheEntry * tce ;
316
- RelationKey key ;
322
+ TypeCacheEntry * tce = lookup_type_cache (prel -> atttype ,0 );
323
+ RelationKey key ;
324
+
317
325
key .dbid = MyDatabaseId ;
318
326
key .relid = parent_oid ;
319
-
320
327
rangerel = (RangeRelation * )
321
328
hash_search (range_restrictions , (void * )& key ,HASH_ENTER ,& found );
329
+ rangerel -> by_val = tce -> typbyval ;
330
+ alloc_dsm_array (& rangerel -> ranges ,sizeof (RangeEntry ),children_count );
331
+ }
322
332
323
- alloc_dsm_array (& rangerel -> ranges ,sizeof (RangeEntry ),proc );
324
- ranges = (RangeEntry * )dsm_array_get_pointer (& rangerel -> ranges );
333
+ /* Validate partitions constraints */
334
+ if (!validate_partition_constraints (children_oids ,
335
+ children_count ,
336
+ snapshot ,
337
+ prel ,
338
+ rangerel ))
339
+ {
340
+ RelationKey key ;
325
341
326
- tce = lookup_type_cache (prel -> atttype ,0 );
327
- rangerel -> by_val = tce -> typbyval ;
342
+ /*
343
+ * If validation failed then pg_pathman cannot handle this relation.
344
+ * Remove it from the cache
345
+ */
346
+ key .dbid = MyDatabaseId ;
347
+ key .relid = parent_oid ;
348
+
349
+ free_dsm_array (& prel -> children );
350
+ free_dsm_array (& rangerel -> ranges );
351
+ hash_search (relations , (const void * )& key ,HASH_REMOVE ,& found );
352
+ if (prel -> parttype == PT_RANGE )
353
+ hash_search (range_restrictions , (const void * )& key ,HASH_REMOVE ,& found );
354
+
355
+ elog (WARNING ,"Validation failed for relation '%s'. "
356
+ "It will not be handled by pg_pathman" ,
357
+ get_rel_name (parent_oid ));
328
358
}
359
+ else
360
+ prel -> children_count = children_count ;
329
361
330
- for (i = 0 ;i < proc ;i ++ )
362
+ pfree (children_oids );
363
+ }
364
+ }
365
+
366
+ static bool
367
+ validate_partition_constraints (Oid * children_oids ,
368
+ uint children_count ,
369
+ Snapshot snapshot ,
370
+ PartRelationInfo * prel ,
371
+ RangeRelation * rangerel )
372
+ {
373
+ RangeEntry re ;
374
+ bool isnull ;
375
+ char * conbin ;
376
+ Expr * expr ;
377
+ Datum oids [1 ];
378
+ bool nulls [1 ]= {false};
379
+ Oid types [1 ]= {INT4OID };
380
+ Datum min ,
381
+ max ;
382
+ Datum conbin_datum ;
383
+ Form_pg_constraint con ;
384
+ RangeEntry * ranges = NULL ;
385
+ Oid * children ;
386
+ int hash ;
387
+ int i ,j ,idx ;
388
+ int ret ;
389
+ int proc ;
390
+ SPIPlanPtr plan ;
391
+
392
+ /* Iterate through children */
393
+ for (idx = 0 ;idx < children_count ;idx ++ )
394
+ {
395
+ Oid child_oid = children_oids [idx ];
396
+
397
+ oids [0 ]= Int32GetDatum (child_oid );
398
+
399
+ /* Load constraints */
400
+ plan = SPI_prepare ("select * from pg_constraint where conrelid = $1 and contype = 'c';" ,1 ,types );
401
+ ret = SPI_execute_snapshot (plan ,oids ,nulls ,snapshot ,InvalidSnapshot , true, false,0 );
402
+ proc = SPI_processed ;
403
+
404
+ if (ret <=0 || SPI_tuptable == NULL )
331
405
{
332
- RangeEntry re ;
333
- HeapTuple tuple = tuptable -> vals [i ];
334
- bool isnull ;
335
- Datum val ;
336
- char * conbin ;
337
- Expr * expr ;
406
+ elog (WARNING ,"No constraints found for relation '%s'." ,
407
+ get_rel_name (child_oid ));
408
+ return false;
409
+ }
338
410
339
- Form_pg_constraint con ;
411
+ children = dsm_array_get_pointer (& prel -> children );
412
+ if (prel -> parttype == PT_RANGE )
413
+ ranges = (RangeEntry * )dsm_array_get_pointer (& rangerel -> ranges );
414
+
415
+ /* Iterate through check constraints and try to validate them */
416
+ for (j = 0 ;j < proc ;j ++ )
417
+ {
418
+ HeapTuple tuple = SPI_tuptable -> vals [j ];
340
419
341
420
con = (Form_pg_constraint )GETSTRUCT (tuple );
421
+ conbin_datum = SysCacheGetAttr (CONSTROID ,tuple ,Anum_pg_constraint_conbin ,& isnull );
342
422
343
- val = SysCacheGetAttr (CONSTROID ,tuple ,Anum_pg_constraint_conbin ,
344
- & isnull );
423
+ /* handle unexpected null value */
345
424
if (isnull )
346
- elog (ERROR ,"null conbin for constraint %u" ,
347
- HeapTupleGetOid ( tuple ));
348
- conbin = TextDatumGetCString (val );
425
+ elog (ERROR ,"null conbin for constraint %u" ,HeapTupleGetOid ( tuple ));
426
+
427
+ conbin = TextDatumGetCString (conbin_datum );
349
428
expr = (Expr * )stringToNode (conbin );
350
429
351
430
switch (prel -> parttype )
352
431
{
353
432
case PT_RANGE :
354
433
if (!validate_range_constraint (expr ,prel ,& min ,& max ))
355
- {
356
- elog (WARNING ,"Wrong CHECK constraint for relation '%s'. "
357
- "It MUST have exact format: "
358
- "VARIABLE >= CONST AND VARIABLE < CONST. Skipping..." ,
359
- get_rel_name (con -> conrelid ));
360
434
continue ;
361
- }
362
435
363
436
/* If datum is referenced by val then just assign */
364
437
if (rangerel -> by_val )
@@ -373,57 +446,74 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
373
446
memcpy (& re .max ,DatumGetPointer (max ),sizeof (re .max ));
374
447
}
375
448
re .child_oid = con -> conrelid ;
376
- ranges [i ]= re ;
449
+ ranges [idx ]= re ;
377
450
break ;
378
451
379
452
case PT_HASH :
380
453
if (!validate_hash_constraint (expr ,prel ,& hash ))
381
- {
382
- elog (WARNING ,"Wrong CHECK constraint format for relation '%s'. "
383
- "Skipping..." ,
384
- get_rel_name (con -> conrelid ));
385
454
continue ;
386
- }
387
455
children [hash ]= con -> conrelid ;
456
+ break ;
388
457
}
458
+
459
+ /* Constraint validated successfully. Move on to the next child */
460
+ gotovalidate_next_child ;
389
461
}
390
- prel -> children_count = proc ;
391
462
392
- if (prel -> parttype == PT_RANGE )
463
+ /* No constraint matches pattern */
464
+ switch (prel -> parttype )
393
465
{
394
- TypeCacheEntry * tce ;
395
- bool byVal = rangerel -> by_val ;
466
+ case PT_RANGE :
467
+ elog (WARNING ,"Wrong CHECK constraint for relation '%s'. "
468
+ "It MUST have exact format: "
469
+ "VARIABLE >= CONST AND VARIABLE < CONST." ,
470
+ get_rel_name (con -> conrelid ));
471
+ break ;
472
+ case PT_HASH :
473
+ elog (WARNING ,"Wrong CHECK constraint format for relation '%s'." ,
474
+ get_rel_name (con -> conrelid ));
475
+ break ;
476
+ }
477
+ return false;
396
478
397
- /* Sort ascending */
398
- tce = lookup_type_cache (prel -> atttype ,TYPECACHE_CMP_PROC |TYPECACHE_CMP_PROC_FINFO );
399
- qsort_type_cmp_func = & tce -> cmp_proc_finfo ;
400
- globalByVal = byVal ;
401
- qsort (ranges ,proc ,sizeof (RangeEntry ),cmp_range_entries );
479
+ validate_next_child :
480
+ continue ;
481
+ }
402
482
403
- /* Copy oids to prel */
404
- for (i = 0 ;i < proc ;i ++ )
405
- children [i ]= ranges [i ].child_oid ;
483
+ /*
484
+ * Sort range partitions and check for overlaps
485
+ */
486
+ if (prel -> parttype == PT_RANGE )
487
+ {
488
+ TypeCacheEntry * tce ;
489
+ bool byVal = rangerel -> by_val ;
406
490
407
- /* Check if some ranges overlap */
408
- for (i = 0 ;i < proc - 1 ;i ++ )
409
- {
410
- Datum cur_upper = PATHMAN_GET_DATUM (ranges [i ].max ,byVal );
411
- Datum next_lower = PATHMAN_GET_DATUM (ranges [i + 1 ].min ,byVal );
412
- bool overlap = DatumGetInt32 (FunctionCall2 (qsort_type_cmp_func ,next_lower ,cur_upper ))< 0 ;
491
+ /* Sort ascending */
492
+ tce = lookup_type_cache (prel -> atttype ,TYPECACHE_CMP_PROC |TYPECACHE_CMP_PROC_FINFO );
493
+ qsort_type_cmp_func = & tce -> cmp_proc_finfo ;
494
+ globalByVal = byVal ;
495
+ qsort (ranges ,children_count ,sizeof (RangeEntry ),cmp_range_entries );
413
496
414
- if (overlap )
415
- {
416
- RelationKey key ;
417
- key .dbid = MyDatabaseId ;
418
- key .relid = parent_oid ;
497
+ /* Copy oids to prel */
498
+ for (i = 0 ;i < children_count ;i ++ )
499
+ children [i ]= ranges [i ].child_oid ;
419
500
420
- elog (WARNING ,"Partitions %u and %u overlap. Disabling pathman for relation %u..." ,
421
- ranges [i ].child_oid ,ranges [i + 1 ].child_oid ,parent_oid );
422
- hash_search (relations , (const void * )& key ,HASH_REMOVE ,& found );
423
- }
501
+ /* Check if some ranges overlap */
502
+ for (i = 0 ;i < children_count - 1 ;i ++ )
503
+ {
504
+ Datum cur_upper = PATHMAN_GET_DATUM (ranges [i ].max ,byVal );
505
+ Datum next_lower = PATHMAN_GET_DATUM (ranges [i + 1 ].min ,byVal );
506
+ bool overlap = DatumGetInt32 (FunctionCall2 (qsort_type_cmp_func ,next_lower ,cur_upper ))< 0 ;
507
+
508
+ if (overlap )
509
+ {
510
+ elog (WARNING ,"Partitions %u and %u overlap." ,
511
+ ranges [i ].child_oid ,ranges [i + 1 ].child_oid );
512
+ return false;
424
513
}
425
514
}
426
515
}
516
+ return true;
427
517
}
428
518
429
519