Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit355684e

Browse files
committed
Correct attach/detach logic for FKs in partitions
There was no code to handle foreign key constraints on partitionedtables in the case of ALTER TABLE DETACH; and if you happened to ATTACHa partition that already had an equivalent constraint, that one wasignored and a new constraint was created. Adding this to the fact thatforeign key cloning reuses the constraint name on the partition insteadof generating a new name (as it probably should, to cater to SQLstandard rules about constraint naming within schemas), the result was apretty poor user experience -- the most visible failure was that justdetaching a partition and re-attaching it failed with an error such as ERROR: duplicate key value violates unique constraint "pg_constraint_conrelid_contypid_conname_index" DETAIL: Key (conrelid, contypid, conname)=(26702, 0, test_result_asset_id_fkey) already exists.because it would try to create an identically-named constraint in thepartition. To make matters worse, if you tried to drop the constraintin the now-independent partition, that would fail because the constraintwas still seen as dependent on the constraint in its former parentpartitioned table: ERROR: cannot drop inherited constraint "test_result_asset_id_fkey" of relation "test_result_cbsystem_0001_0050_monthly_2018_09"This fix attacks the problem from two angles: first, when the partitionis detached, the constraint is also marked as independent, so the dropnow works. Second, when the partition is re-attached, we scan existingconstraints searching for one matching the FK in the parent, and if oneexists, we link that one to the parent constraint. So we don't end upwith a duplicate -- and better yet, we don't need to scan the referencedtable to verify that the constraint holds.To implement this I made a small change to previously planner-onlystruct ForeignKeyCacheInfo to contain the constraint OID; also relcachenow maintains the list of FKs for partitioned tables too.Backpatch to 11.Reported-by: Michael Vitale (bug #15425)Discussion:https://postgr.es/m/15425-2dbc9d2aa999f816@postgresql.org
1 parent88670a4 commit355684e

File tree

8 files changed

+404
-39
lines changed

8 files changed

+404
-39
lines changed

‎src/backend/catalog/pg_constraint.c

Lines changed: 203 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include"access/htup_details.h"
2020
#include"access/sysattr.h"
2121
#include"access/tupconvert.h"
22+
#include"access/xact.h"
2223
#include"catalog/dependency.h"
2324
#include"catalog/indexing.h"
2425
#include"catalog/objectaccess.h"
@@ -37,6 +38,10 @@
3738
#include"utils/tqual.h"
3839

3940

41+
staticvoidclone_fk_constraints(Relationpg_constraint,RelationparentRel,
42+
RelationpartRel,List*clone,List**cloned);
43+
44+
4045
/*
4146
* CreateConstraintEntry
4247
*Create a constraint table entry.
@@ -400,57 +405,106 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
400405
Relationrel;
401406
ScanKeyDatakey;
402407
SysScanDescscan;
403-
TupleDesctupdesc;
404408
HeapTupletuple;
405-
AttrNumber*attmap;
409+
List*clone=NIL;
406410

407411
parentRel=heap_open(parentId,NoLock);/* already got lock */
408412
/* see ATAddForeignKeyConstraint about lock level */
409413
rel=heap_open(relationId,AccessExclusiveLock);
410-
411414
pg_constraint=heap_open(ConstraintRelationId,RowShareLock);
415+
416+
/* Obtain the list of constraints to clone or attach */
417+
ScanKeyInit(&key,
418+
Anum_pg_constraint_conrelid,BTEqualStrategyNumber,
419+
F_OIDEQ,ObjectIdGetDatum(parentId));
420+
scan=systable_beginscan(pg_constraint,ConstraintRelidTypidNameIndexId, true,
421+
NULL,1,&key);
422+
while ((tuple=systable_getnext(scan))!=NULL)
423+
clone=lappend_oid(clone,HeapTupleGetOid(tuple));
424+
systable_endscan(scan);
425+
426+
/* Do the actual work, recursing to partitions as needed */
427+
clone_fk_constraints(pg_constraint,parentRel,rel,clone,cloned);
428+
429+
/* We're done. Clean up */
430+
heap_close(parentRel,NoLock);
431+
heap_close(rel,NoLock);/* keep lock till commit */
432+
heap_close(pg_constraint,RowShareLock);
433+
}
434+
435+
/*
436+
* clone_fk_constraints
437+
*Recursive subroutine for CloneForeignKeyConstraints
438+
*
439+
* Clone the given list of FK constraints when a partition is attached.
440+
*
441+
* When cloning foreign keys to a partition, it may happen that equivalent
442+
* constraints already exist in the partition for some of them. We can skip
443+
* creating a clone in that case, and instead just attach the existing
444+
* constraint to the one in the parent.
445+
*
446+
* This function recurses to partitions, if the new partition is partitioned;
447+
* of course, only do this for FKs that were actually cloned.
448+
*/
449+
staticvoid
450+
clone_fk_constraints(Relationpg_constraint,RelationparentRel,
451+
RelationpartRel,List*clone,List**cloned)
452+
{
453+
TupleDesctupdesc;
454+
AttrNumber*attmap;
455+
List*partFKs;
456+
List*subclone=NIL;
457+
ListCell*cell;
458+
412459
tupdesc=RelationGetDescr(pg_constraint);
413460

414461
/*
415462
* The constraint key may differ, if the columns in the partition are
416463
* different. This map is used to convert them.
417464
*/
418-
attmap=convert_tuples_by_name_map(RelationGetDescr(rel),
465+
attmap=convert_tuples_by_name_map(RelationGetDescr(partRel),
419466
RelationGetDescr(parentRel),
420467
gettext_noop("could not convert row type"));
421468

422-
ScanKeyInit(&key,
423-
Anum_pg_constraint_conrelid,BTEqualStrategyNumber,
424-
F_OIDEQ,ObjectIdGetDatum(parentId));
425-
scan=systable_beginscan(pg_constraint,ConstraintRelidTypidNameIndexId, true,
426-
NULL,1,&key);
469+
partFKs=copyObject(RelationGetFKeyList(partRel));
427470

428-
while ((tuple=systable_getnext(scan))!=NULL)
471+
foreach(cell,clone)
429472
{
430-
Form_pg_constraintconstrForm= (Form_pg_constraint)GETSTRUCT(tuple);
473+
OidparentConstrOid=lfirst_oid(cell);
474+
Form_pg_constraintconstrForm;
475+
HeapTupletuple;
431476
AttrNumberconkey[INDEX_MAX_KEYS];
432477
AttrNumbermapped_conkey[INDEX_MAX_KEYS];
433478
AttrNumberconfkey[INDEX_MAX_KEYS];
434479
Oidconpfeqop[INDEX_MAX_KEYS];
435480
Oidconppeqop[INDEX_MAX_KEYS];
436481
Oidconffeqop[INDEX_MAX_KEYS];
437482
Constraint*fkconstraint;
438-
ClonedConstraint*newc;
483+
boolattach_it;
439484
OidconstrOid;
440485
ObjectAddressparentAddr,
441486
childAddr;
442487
intnelem;
488+
ListCell*cell;
443489
inti;
444490
ArrayType*arr;
445491
Datumdatum;
446492
boolisnull;
447493

494+
tuple=SearchSysCache1(CONSTROID,parentConstrOid);
495+
if (!tuple)
496+
elog(ERROR,"cache lookup failed for constraint %u",
497+
parentConstrOid);
498+
constrForm= (Form_pg_constraint)GETSTRUCT(tuple);
499+
448500
/* only foreign keys */
449501
if (constrForm->contype!=CONSTRAINT_FOREIGN)
502+
{
503+
ReleaseSysCache(tuple);
450504
continue;
505+
}
451506

452-
ObjectAddressSet(parentAddr,ConstraintRelationId,
453-
HeapTupleGetOid(tuple));
507+
ObjectAddressSet(parentAddr,ConstraintRelationId,parentConstrOid);
454508

455509
datum=fastgetattr(tuple,Anum_pg_constraint_conkey,
456510
tupdesc,&isnull);
@@ -539,6 +593,90 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
539593
elog(ERROR,"conffeqop is not a 1-D OID array");
540594
memcpy(conffeqop,ARR_DATA_PTR(arr),nelem*sizeof(Oid));
541595

596+
/*
597+
* Before creating a new constraint, see whether any existing FKs are
598+
* fit for the purpose. If one is, attach the parent constraint to it,
599+
* and don't clone anything. This way we avoid the expensive
600+
* verification step and don't end up with a duplicate FK. This also
601+
* means we don't consider this constraint when recursing to
602+
* partitions.
603+
*/
604+
attach_it= false;
605+
foreach(cell,partFKs)
606+
{
607+
ForeignKeyCacheInfo*fk=lfirst_node(ForeignKeyCacheInfo,cell);
608+
Form_pg_constraintpartConstr;
609+
HeapTuplepartcontup;
610+
611+
attach_it= true;
612+
613+
/*
614+
* Do some quick & easy initial checks. If any of these fail, we
615+
* cannot use this constraint, but keep looking.
616+
*/
617+
if (fk->confrelid!=constrForm->confrelid||fk->nkeys!=nelem)
618+
{
619+
attach_it= false;
620+
continue;
621+
}
622+
for (i=0;i<nelem;i++)
623+
{
624+
if (fk->conkey[i]!=mapped_conkey[i]||
625+
fk->confkey[i]!=confkey[i]||
626+
fk->conpfeqop[i]!=conpfeqop[i])
627+
{
628+
attach_it= false;
629+
break;
630+
}
631+
}
632+
if (!attach_it)
633+
continue;
634+
635+
/*
636+
* Looks good so far; do some more extensive checks. Presumably
637+
* the check for 'convalidated' could be dropped, since we don't
638+
* really care about that, but let's be careful for now.
639+
*/
640+
partcontup=SearchSysCache1(CONSTROID,
641+
ObjectIdGetDatum(fk->conoid));
642+
if (!partcontup)
643+
elog(ERROR,"cache lookup failed for constraint %u",
644+
fk->conoid);
645+
partConstr= (Form_pg_constraint)GETSTRUCT(partcontup);
646+
if (OidIsValid(partConstr->conparentid)||
647+
!partConstr->convalidated||
648+
partConstr->condeferrable!=constrForm->condeferrable||
649+
partConstr->condeferred!=constrForm->condeferred||
650+
partConstr->confupdtype!=constrForm->confupdtype||
651+
partConstr->confdeltype!=constrForm->confdeltype||
652+
partConstr->confmatchtype!=constrForm->confmatchtype)
653+
{
654+
ReleaseSysCache(partcontup);
655+
attach_it= false;
656+
continue;
657+
}
658+
659+
ReleaseSysCache(partcontup);
660+
661+
/* looks good! Attach this constraint */
662+
ConstraintSetParentConstraint(fk->conoid,
663+
HeapTupleGetOid(tuple));
664+
CommandCounterIncrement();
665+
attach_it= true;
666+
break;
667+
}
668+
669+
/*
670+
* If we attached to an existing constraint, there is no need to
671+
* create a new one. In fact, there's no need to recurse for this
672+
* constraint to partitions, either.
673+
*/
674+
if (attach_it)
675+
{
676+
ReleaseSysCache(tuple);
677+
continue;
678+
}
679+
542680
constrOid=
543681
CreateConstraintEntry(NameStr(constrForm->conname),
544682
constrForm->connamespace,
@@ -547,7 +685,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
547685
constrForm->condeferred,
548686
constrForm->convalidated,
549687
HeapTupleGetOid(tuple),
550-
relationId,
688+
RelationGetRelid(partRel),
551689
mapped_conkey,
552690
nelem,
553691
nelem,
@@ -568,6 +706,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
568706
NULL,
569707
false,
570708
1, false, true);
709+
subclone=lappend_oid(subclone,constrOid);
571710

572711
ObjectAddressSet(childAddr,ConstraintRelationId,constrOid);
573712
recordDependencyOn(&childAddr,&parentAddr,DEPENDENCY_INTERNAL_AUTO);
@@ -580,43 +719,56 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
580719
fkconstraint->deferrable=constrForm->condeferrable;
581720
fkconstraint->initdeferred=constrForm->condeferred;
582721

583-
createForeignKeyTriggers(rel,constrForm->confrelid,fkconstraint,
722+
createForeignKeyTriggers(partRel,constrForm->confrelid,fkconstraint,
584723
constrOid,constrForm->conindid, false);
585724

586725
if (cloned)
587726
{
727+
ClonedConstraint*newc;
728+
588729
/*
589730
* Feed back caller about the constraints we created, so that they
590731
* can set up constraint verification.
591732
*/
592733
newc=palloc(sizeof(ClonedConstraint));
593-
newc->relid=relationId;
734+
newc->relid=RelationGetRelid(partRel);
594735
newc->refrelid=constrForm->confrelid;
595736
newc->conindid=constrForm->conindid;
596737
newc->conid=constrOid;
597738
newc->constraint=fkconstraint;
598739

599740
*cloned=lappend(*cloned,newc);
600741
}
742+
743+
ReleaseSysCache(tuple);
601744
}
602-
systable_endscan(scan);
603745

604746
pfree(attmap);
747+
list_free_deep(partFKs);
605748

606-
if (rel->rd_rel->relkind==RELKIND_PARTITIONED_TABLE)
749+
/*
750+
* If the partition is partitioned, recurse to handle any constraints that
751+
* were cloned.
752+
*/
753+
if (partRel->rd_rel->relkind==RELKIND_PARTITIONED_TABLE&&
754+
subclone!=NIL)
607755
{
608-
PartitionDescpartdesc=RelationGetPartitionDesc(rel);
756+
PartitionDescpartdesc=RelationGetPartitionDesc(partRel);
609757
inti;
610758

611759
for (i=0;i<partdesc->nparts;i++)
612-
CloneForeignKeyConstraints(RelationGetRelid(rel),
613-
partdesc->oids[i],
614-
cloned);
760+
{
761+
RelationchildRel;
762+
763+
childRel=heap_open(partdesc->oids[i],AccessExclusiveLock);
764+
clone_fk_constraints(pg_constraint,
765+
partRel,
766+
childRel,
767+
subclone,
768+
cloned);
769+
heap_close(childRel,NoLock);/* keep lock till commit */
770+
}
615771
}
616-
617-
heap_close(rel,NoLock);/* keep lock till commit */
618-
heap_close(parentRel,NoLock);
619-
heap_close(pg_constraint,RowShareLock);
620772
}
621773

622774
/*
@@ -1028,17 +1180,33 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
10281180
elog(ERROR,"cache lookup failed for constraint %u",childConstrId);
10291181
newtup=heap_copytuple(tuple);
10301182
constrForm= (Form_pg_constraint)GETSTRUCT(newtup);
1031-
constrForm->conislocal= false;
1032-
constrForm->coninhcount++;
1033-
constrForm->conparentid=parentConstrId;
1034-
CatalogTupleUpdate(constrRel,&tuple->t_self,newtup);
1035-
ReleaseSysCache(tuple);
1183+
if (OidIsValid(parentConstrId))
1184+
{
1185+
constrForm->conislocal= false;
1186+
constrForm->coninhcount++;
1187+
constrForm->conparentid=parentConstrId;
1188+
1189+
CatalogTupleUpdate(constrRel,&tuple->t_self,newtup);
10361190

1037-
ObjectAddressSet(referenced,ConstraintRelationId,parentConstrId);
1038-
ObjectAddressSet(depender,ConstraintRelationId,childConstrId);
1191+
ObjectAddressSet(referenced,ConstraintRelationId,parentConstrId);
1192+
ObjectAddressSet(depender,ConstraintRelationId,childConstrId);
10391193

1040-
recordDependencyOn(&depender,&referenced,DEPENDENCY_INTERNAL_AUTO);
1194+
recordDependencyOn(&depender,&referenced,DEPENDENCY_INTERNAL_AUTO);
1195+
}
1196+
else
1197+
{
1198+
constrForm->coninhcount--;
1199+
if (constrForm->coninhcount <=0)
1200+
constrForm->conislocal= true;
1201+
constrForm->conparentid=InvalidOid;
1202+
1203+
deleteDependencyRecordsForClass(ConstraintRelationId,childConstrId,
1204+
ConstraintRelationId,
1205+
DEPENDENCY_INTERNAL_AUTO);
1206+
CatalogTupleUpdate(constrRel,&tuple->t_self,newtup);
1207+
}
10411208

1209+
ReleaseSysCache(tuple);
10421210
heap_close(constrRel,RowExclusiveLock);
10431211
}
10441212

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp