Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd1587ff

Browse files
committed
fix check constraints validation mechanism
1 parent4f04cff commitd1587ff

File tree

4 files changed

+179
-89
lines changed

4 files changed

+179
-89
lines changed

‎src/init.c

Lines changed: 174 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ boolinitialization_needed = true;
3131
staticFmgrInfo*qsort_type_cmp_func;
3232
staticboolglobalByVal;
3333

34+
staticboolvalidate_partition_constraints(Oid*children_oids,
35+
uintchildren_count,
36+
Snapshotsnapshot,
37+
PartRelationInfo*prel,
38+
RangeRelation*rangerel);
3439
staticboolvalidate_range_constraint(Expr*,PartRelationInfo*,Datum*,Datum*);
3540
staticboolvalidate_hash_constraint(Expr*expr,PartRelationInfo*prel,int*hash);
3641
staticboolread_opexpr_const(OpExpr*opexpr,intvarattno,Datum*val);
@@ -120,7 +125,7 @@ load_config(void)
120125

121126
/* Load cache */
122127
LWLockAcquire(pmstate->load_config_lock,LW_EXCLUSIVE);
123-
load_relations_hashtable(new_segment_created);
128+
load_relations(new_segment_created);
124129
LWLockRelease(pmstate->load_config_lock);
125130
LWLockRelease(pmstate->dsm_init_lock);
126131
}
@@ -155,7 +160,7 @@ get_extension_schema()
155160
* Loads partitioned tables structure to hashtable
156161
*/
157162
void
158-
load_relations_hashtable(boolreinitialize)
163+
load_relations(boolreinitialize)
159164
{
160165
intret,
161166
i,
@@ -233,15 +238,15 @@ load_relations_hashtable(bool reinitialize)
233238
free_dsm_array(&rangerel->ranges);
234239
prel->children_count=0;
235240
}
236-
load_check_constraints(oid,GetCatalogSnapshot(oid));
241+
load_partitions(oid,GetCatalogSnapshot(oid));
237242
break;
238243
casePT_HASH:
239244
if (reinitialize&&prel->children.length>0)
240245
{
241246
free_dsm_array(&prel->children);
242247
prel->children_count=0;
243248
}
244-
load_check_constraints(oid,GetCatalogSnapshot(oid));
249+
load_partitions(oid,GetCatalogSnapshot(oid));
245250
break;
246251
}
247252
}
@@ -268,18 +273,19 @@ create_relations_hashtable()
268273
* Load and validate CHECK constraints
269274
*/
270275
void
271-
load_check_constraints(Oidparent_oid,Snapshotsnapshot)
276+
load_partitions(Oidparent_oid,Snapshotsnapshot)
272277
{
273278
PartRelationInfo*prel=NULL;
274279
RangeRelation*rangerel=NULL;
275280
SPIPlanPtrplan;
276281
boolfound;
277282
intret,
278283
i,
279-
proc;
284+
children_count=0;
280285
Datumvals[1];
281-
Oidoids[1]= {INT4OID};
286+
Oidtypes[1]= {INT4OID};
282287
boolnulls[1]= {false};
288+
Oid*children_oids;
283289

284290
vals[0]=Int32GetDatum(parent_oid);
285291
prel=get_pathman_relation_info(parent_oid,NULL);
@@ -288,77 +294,144 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
288294
if (prel->children.length>0)
289295
return;
290296

291-
plan=SPI_prepare("select pg_constraint.* "
292-
"from pg_constraint "
293-
"join pg_inherits on inhrelid = conrelid "
294-
"where inhparent = $1 and contype='c';",
295-
1,oids);
296-
ret=SPI_execute_snapshot(plan,vals,nulls,
297-
snapshot,InvalidSnapshot, true, false,0);
298-
299-
proc=SPI_processed;
300-
297+
/* Load children oids */
298+
plan=SPI_prepare("select inhrelid from pg_inherits where inhparent = $1;",1,types);
299+
ret=SPI_execute_snapshot(plan,vals,nulls,snapshot,InvalidSnapshot, true, false,0);
300+
children_count=SPI_processed;
301301
if (ret>0&&SPI_tuptable!=NULL)
302302
{
303-
SPITupleTable*tuptable=SPI_tuptable;
304-
Oid*children;
305-
RangeEntry*ranges=NULL;
306-
Datummin;
307-
Datummax;
308-
inthash;
303+
children_oids=palloc(sizeof(Oid)*children_count);
304+
for(i=0;i<children_count;i++)
305+
{
306+
TupleDesctupdesc=SPI_tuptable->tupdesc;
307+
HeapTupletuple=SPI_tuptable->vals[i];
308+
boolisnull;
309309

310-
alloc_dsm_array(&prel->children,sizeof(Oid),proc);
311-
children= (Oid*)dsm_array_get_pointer(&prel->children);
310+
children_oids[i]= \
311+
DatumGetObjectId(SPI_getbinval(tuple,tupdesc,1,&isnull));
312+
}
313+
}
312314

315+
if (children_count>0)
316+
{
317+
alloc_dsm_array(&prel->children,sizeof(Oid),children_count);
318+
319+
/* allocate ranges array is dsm */
313320
if (prel->parttype==PT_RANGE)
314321
{
315-
TypeCacheEntry*tce;
316-
RelationKeykey;
322+
TypeCacheEntry*tce=lookup_type_cache(prel->atttype,0);
323+
RelationKeykey;
324+
317325
key.dbid=MyDatabaseId;
318326
key.relid=parent_oid;
319-
320327
rangerel= (RangeRelation*)
321328
hash_search(range_restrictions, (void*)&key,HASH_ENTER,&found);
329+
rangerel->by_val=tce->typbyval;
330+
alloc_dsm_array(&rangerel->ranges,sizeof(RangeEntry),children_count);
331+
}
322332

323-
alloc_dsm_array(&rangerel->ranges,sizeof(RangeEntry),proc);
324-
ranges= (RangeEntry*)dsm_array_get_pointer(&rangerel->ranges);
333+
/* Validate partitions constraints */
334+
if (!validate_partition_constraints(children_oids,
335+
children_count,
336+
snapshot,
337+
prel,
338+
rangerel))
339+
{
340+
RelationKeykey;
325341

326-
tce=lookup_type_cache(prel->atttype,0);
327-
rangerel->by_val=tce->typbyval;
342+
/*
343+
* If validation failed then pg_pathman cannot handle this relation.
344+
* Remove it from the cache
345+
*/
346+
key.dbid=MyDatabaseId;
347+
key.relid=parent_oid;
348+
349+
free_dsm_array(&prel->children);
350+
free_dsm_array(&rangerel->ranges);
351+
hash_search(relations, (constvoid*)&key,HASH_REMOVE,&found);
352+
if (prel->parttype==PT_RANGE)
353+
hash_search(range_restrictions, (constvoid*)&key,HASH_REMOVE,&found);
354+
355+
elog(WARNING,"Validation failed for relation '%s'. "
356+
"It will not be handled by pg_pathman",
357+
get_rel_name(parent_oid));
328358
}
359+
else
360+
prel->children_count=children_count;
329361

330-
for (i=0;i<proc;i++)
362+
pfree(children_oids);
363+
}
364+
}
365+
366+
staticbool
367+
validate_partition_constraints(Oid*children_oids,
368+
uintchildren_count,
369+
Snapshotsnapshot,
370+
PartRelationInfo*prel,
371+
RangeRelation*rangerel)
372+
{
373+
RangeEntryre;
374+
boolisnull;
375+
char*conbin;
376+
Expr*expr;
377+
Datumoids[1];
378+
boolnulls[1]= {false};
379+
Oidtypes[1]= {INT4OID};
380+
Datummin,
381+
max;
382+
Datumconbin_datum;
383+
Form_pg_constraintcon;
384+
RangeEntry*ranges=NULL;
385+
Oid*children;
386+
inthash;
387+
inti,j,idx;
388+
intret;
389+
intproc;
390+
SPIPlanPtrplan;
391+
392+
/* Iterate through children */
393+
for (idx=0;idx<children_count;idx++)
394+
{
395+
Oidchild_oid=children_oids[idx];
396+
397+
oids[0]=Int32GetDatum(child_oid);
398+
399+
/* Load constraints */
400+
plan=SPI_prepare("select * from pg_constraint where conrelid = $1 and contype = 'c';",1,types);
401+
ret=SPI_execute_snapshot(plan,oids,nulls,snapshot,InvalidSnapshot, true, false,0);
402+
proc=SPI_processed;
403+
404+
if (ret <=0||SPI_tuptable==NULL)
331405
{
332-
RangeEntryre;
333-
HeapTupletuple=tuptable->vals[i];
334-
boolisnull;
335-
Datumval;
336-
char*conbin;
337-
Expr*expr;
406+
elog(WARNING,"No constraints found for relation '%s'.",
407+
get_rel_name(child_oid));
408+
return false;
409+
}
338410

339-
Form_pg_constraintcon;
411+
children=dsm_array_get_pointer(&prel->children);
412+
if (prel->parttype==PT_RANGE)
413+
ranges= (RangeEntry*)dsm_array_get_pointer(&rangerel->ranges);
414+
415+
/* Iterate through check constraints and try to validate them */
416+
for (j=0;j<proc;j++)
417+
{
418+
HeapTupletuple=SPI_tuptable->vals[j];
340419

341420
con= (Form_pg_constraint)GETSTRUCT(tuple);
421+
conbin_datum=SysCacheGetAttr(CONSTROID,tuple,Anum_pg_constraint_conbin,&isnull);
342422

343-
val=SysCacheGetAttr(CONSTROID,tuple,Anum_pg_constraint_conbin,
344-
&isnull);
423+
/* handle unexpected null value */
345424
if (isnull)
346-
elog(ERROR,"null conbin for constraint %u",
347-
HeapTupleGetOid(tuple));
348-
conbin=TextDatumGetCString(val);
425+
elog(ERROR,"null conbin for constraint %u",HeapTupleGetOid(tuple));
426+
427+
conbin=TextDatumGetCString(conbin_datum);
349428
expr= (Expr*)stringToNode(conbin);
350429

351430
switch(prel->parttype)
352431
{
353432
casePT_RANGE:
354433
if (!validate_range_constraint(expr,prel,&min,&max))
355-
{
356-
elog(WARNING,"Wrong CHECK constraint for relation '%s'. "
357-
"It MUST have exact format: "
358-
"VARIABLE >= CONST AND VARIABLE < CONST. Skipping...",
359-
get_rel_name(con->conrelid));
360434
continue;
361-
}
362435

363436
/* If datum is referenced by val then just assign */
364437
if (rangerel->by_val)
@@ -373,57 +446,74 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
373446
memcpy(&re.max,DatumGetPointer(max),sizeof(re.max));
374447
}
375448
re.child_oid=con->conrelid;
376-
ranges[i]=re;
449+
ranges[idx]=re;
377450
break;
378451

379452
casePT_HASH:
380453
if (!validate_hash_constraint(expr,prel,&hash))
381-
{
382-
elog(WARNING,"Wrong CHECK constraint format for relation '%s'. "
383-
"Skipping...",
384-
get_rel_name(con->conrelid));
385454
continue;
386-
}
387455
children[hash]=con->conrelid;
456+
break;
388457
}
458+
459+
/* Constraint validated successfully. Move on to the next child */
460+
gotovalidate_next_child;
389461
}
390-
prel->children_count=proc;
391462

392-
if (prel->parttype==PT_RANGE)
463+
/* No constraint matches pattern */
464+
switch(prel->parttype)
393465
{
394-
TypeCacheEntry*tce;
395-
boolbyVal=rangerel->by_val;
466+
casePT_RANGE:
467+
elog(WARNING,"Wrong CHECK constraint for relation '%s'. "
468+
"It MUST have exact format: "
469+
"VARIABLE >= CONST AND VARIABLE < CONST.",
470+
get_rel_name(con->conrelid));
471+
break;
472+
casePT_HASH:
473+
elog(WARNING,"Wrong CHECK constraint format for relation '%s'.",
474+
get_rel_name(con->conrelid));
475+
break;
476+
}
477+
return false;
396478

397-
/* Sort ascending */
398-
tce=lookup_type_cache(prel->atttype,TYPECACHE_CMP_PROC |TYPECACHE_CMP_PROC_FINFO);
399-
qsort_type_cmp_func=&tce->cmp_proc_finfo;
400-
globalByVal=byVal;
401-
qsort(ranges,proc,sizeof(RangeEntry),cmp_range_entries);
479+
validate_next_child:
480+
continue;
481+
}
402482

403-
/* Copy oids to prel */
404-
for(i=0;i<proc;i++)
405-
children[i]=ranges[i].child_oid;
483+
/*
484+
* Sort range partitions and check for overlaps
485+
*/
486+
if (prel->parttype==PT_RANGE)
487+
{
488+
TypeCacheEntry*tce;
489+
boolbyVal=rangerel->by_val;
406490

407-
/* Check if some ranges overlap */
408-
for(i=0;i<proc-1;i++)
409-
{
410-
Datumcur_upper=PATHMAN_GET_DATUM(ranges[i].max,byVal);
411-
Datumnext_lower=PATHMAN_GET_DATUM(ranges[i+1].min,byVal);
412-
booloverlap=DatumGetInt32(FunctionCall2(qsort_type_cmp_func,next_lower,cur_upper))<0;
491+
/* Sort ascending */
492+
tce=lookup_type_cache(prel->atttype,TYPECACHE_CMP_PROC |TYPECACHE_CMP_PROC_FINFO);
493+
qsort_type_cmp_func=&tce->cmp_proc_finfo;
494+
globalByVal=byVal;
495+
qsort(ranges,children_count,sizeof(RangeEntry),cmp_range_entries);
413496

414-
if (overlap)
415-
{
416-
RelationKeykey;
417-
key.dbid=MyDatabaseId;
418-
key.relid=parent_oid;
497+
/* Copy oids to prel */
498+
for(i=0;i<children_count;i++)
499+
children[i]=ranges[i].child_oid;
419500

420-
elog(WARNING,"Partitions %u and %u overlap. Disabling pathman for relation %u...",
421-
ranges[i].child_oid,ranges[i+1].child_oid,parent_oid);
422-
hash_search(relations, (constvoid*)&key,HASH_REMOVE,&found);
423-
}
501+
/* Check if some ranges overlap */
502+
for(i=0;i<children_count-1;i++)
503+
{
504+
Datumcur_upper=PATHMAN_GET_DATUM(ranges[i].max,byVal);
505+
Datumnext_lower=PATHMAN_GET_DATUM(ranges[i+1].min,byVal);
506+
booloverlap=DatumGetInt32(FunctionCall2(qsort_type_cmp_func,next_lower,cur_upper))<0;
507+
508+
if (overlap)
509+
{
510+
elog(WARNING,"Partitions %u and %u overlap.",
511+
ranges[i].child_oid,ranges[i+1].child_oid);
512+
return false;
424513
}
425514
}
426515
}
516+
return true;
427517
}
428518

429519

‎src/pathman.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ void load_config(void);
196196
voidcreate_relations_hashtable(void);
197197
voidcreate_hash_restrictions_hashtable(void);
198198
voidcreate_range_restrictions_hashtable(void);
199-
voidload_relations_hashtable(boolreinitialize);
200-
voidload_check_constraints(Oidparent_oid,Snapshotsnapshot);
199+
voidload_relations(boolreinitialize);
200+
voidload_partitions(Oidparent_oid,Snapshotsnapshot);
201201
voidremove_relation_info(Oidrelid);
202202

203203
/* utility functions */

‎src/pl_funcs.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ on_partitions_created(PG_FUNCTION_ARGS)
4444

4545
/* Reload config */
4646
/* TODO: reload just the specified relation */
47-
load_relations_hashtable(false);
47+
load_relations(false);
4848

4949
LWLockRelease(pmstate->load_config_lock);
5050

@@ -64,7 +64,7 @@ on_partitions_updated(PG_FUNCTION_ARGS)
6464
{
6565
LWLockAcquire(pmstate->load_config_lock,LW_EXCLUSIVE);
6666
remove_relation_info(relid);
67-
load_relations_hashtable(false);
67+
load_relations(false);
6868
LWLockRelease(pmstate->load_config_lock);
6969
}
7070

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp