1919#include "access/htup_details.h"
2020#include "access/sysattr.h"
2121#include "access/tupconvert.h"
22+ #include "access/xact.h"
2223#include "catalog/dependency.h"
2324#include "catalog/indexing.h"
2425#include "catalog/objectaccess.h"
3738#include "utils/tqual.h"
3839
3940
41+ static void clone_fk_constraints (Relation pg_constraint ,Relation parentRel ,
42+ Relation partRel ,List * clone ,List * * cloned );
43+
44+
4045/*
4146 * CreateConstraintEntry
4247 *Create a constraint table entry.
@@ -400,57 +405,106 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
400405Relation rel ;
401406ScanKeyData key ;
402407SysScanDesc scan ;
403- TupleDesc tupdesc ;
404408HeapTuple tuple ;
405- AttrNumber * attmap ;
409+ List * clone = NIL ;
406410
407411parentRel = heap_open (parentId ,NoLock );/* already got lock */
408412/* see ATAddForeignKeyConstraint about lock level */
409413rel = heap_open (relationId ,AccessExclusiveLock );
410-
411414pg_constraint = heap_open (ConstraintRelationId ,RowShareLock );
415+
416+ /* Obtain the list of constraints to clone or attach */
417+ ScanKeyInit (& key ,
418+ Anum_pg_constraint_conrelid ,BTEqualStrategyNumber ,
419+ F_OIDEQ ,ObjectIdGetDatum (parentId ));
420+ scan = systable_beginscan (pg_constraint ,ConstraintRelidTypidNameIndexId , true,
421+ NULL ,1 ,& key );
422+ while ((tuple = systable_getnext (scan ))!= NULL )
423+ clone = lappend_oid (clone ,HeapTupleGetOid (tuple ));
424+ systable_endscan (scan );
425+
426+ /* Do the actual work, recursing to partitions as needed */
427+ clone_fk_constraints (pg_constraint ,parentRel ,rel ,clone ,cloned );
428+
429+ /* We're done. Clean up */
430+ heap_close (parentRel ,NoLock );
431+ heap_close (rel ,NoLock );/* keep lock till commit */
432+ heap_close (pg_constraint ,RowShareLock );
433+ }
434+
435+ /*
436+ * clone_fk_constraints
437+ *Recursive subroutine for CloneForeignKeyConstraints
438+ *
439+ * Clone the given list of FK constraints when a partition is attached.
440+ *
441+ * When cloning foreign keys to a partition, it may happen that equivalent
442+ * constraints already exist in the partition for some of them. We can skip
443+ * creating a clone in that case, and instead just attach the existing
444+ * constraint to the one in the parent.
445+ *
446+ * This function recurses to partitions, if the new partition is partitioned;
447+ * of course, only do this for FKs that were actually cloned.
448+ */
449+ static void
450+ clone_fk_constraints (Relation pg_constraint ,Relation parentRel ,
451+ Relation partRel ,List * clone ,List * * cloned )
452+ {
453+ TupleDesc tupdesc ;
454+ AttrNumber * attmap ;
455+ List * partFKs ;
456+ List * subclone = NIL ;
457+ ListCell * cell ;
458+
412459tupdesc = RelationGetDescr (pg_constraint );
413460
414461/*
415462 * The constraint key may differ, if the columns in the partition are
416463 * different. This map is used to convert them.
417464 */
418- attmap = convert_tuples_by_name_map (RelationGetDescr (rel ),
465+ attmap = convert_tuples_by_name_map (RelationGetDescr (partRel ),
419466RelationGetDescr (parentRel ),
420467gettext_noop ("could not convert row type" ));
421468
422- ScanKeyInit (& key ,
423- Anum_pg_constraint_conrelid ,BTEqualStrategyNumber ,
424- F_OIDEQ ,ObjectIdGetDatum (parentId ));
425- scan = systable_beginscan (pg_constraint ,ConstraintRelidTypidNameIndexId , true,
426- NULL ,1 ,& key );
469+ partFKs = copyObject (RelationGetFKeyList (partRel ));
427470
428- while (( tuple = systable_getnext ( scan )) != NULL )
471+ foreach ( cell , clone )
429472{
430- Form_pg_constraint constrForm = (Form_pg_constraint )GETSTRUCT (tuple );
473+ Oid parentConstrOid = lfirst_oid (cell );
474+ Form_pg_constraint constrForm ;
475+ HeapTuple tuple ;
431476AttrNumber conkey [INDEX_MAX_KEYS ];
432477AttrNumber mapped_conkey [INDEX_MAX_KEYS ];
433478AttrNumber confkey [INDEX_MAX_KEYS ];
434479Oid conpfeqop [INDEX_MAX_KEYS ];
435480Oid conppeqop [INDEX_MAX_KEYS ];
436481Oid conffeqop [INDEX_MAX_KEYS ];
437482Constraint * fkconstraint ;
438- ClonedConstraint * newc ;
483+ bool attach_it ;
439484Oid constrOid ;
440485ObjectAddress parentAddr ,
441486childAddr ;
442487int nelem ;
488+ ListCell * cell ;
443489int i ;
444490ArrayType * arr ;
445491Datum datum ;
446492bool isnull ;
447493
494+ tuple = SearchSysCache1 (CONSTROID ,parentConstrOid );
495+ if (!tuple )
496+ elog (ERROR ,"cache lookup failed for constraint %u" ,
497+ parentConstrOid );
498+ constrForm = (Form_pg_constraint )GETSTRUCT (tuple );
499+
448500/* only foreign keys */
449501if (constrForm -> contype != CONSTRAINT_FOREIGN )
502+ {
503+ ReleaseSysCache (tuple );
450504continue ;
505+ }
451506
452- ObjectAddressSet (parentAddr ,ConstraintRelationId ,
453- HeapTupleGetOid (tuple ));
507+ ObjectAddressSet (parentAddr ,ConstraintRelationId ,parentConstrOid );
454508
455509datum = fastgetattr (tuple ,Anum_pg_constraint_conkey ,
456510tupdesc ,& isnull );
@@ -539,6 +593,90 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
539593elog (ERROR ,"conffeqop is not a 1-D OID array" );
540594memcpy (conffeqop ,ARR_DATA_PTR (arr ),nelem * sizeof (Oid ));
541595
596+ /*
597+ * Before creating a new constraint, see whether any existing FKs are
598+ * fit for the purpose. If one is, attach the parent constraint to it,
599+ * and don't clone anything. This way we avoid the expensive
600+ * verification step and don't end up with a duplicate FK. This also
601+ * means we don't consider this constraint when recursing to
602+ * partitions.
603+ */
604+ attach_it = false;
605+ foreach (cell ,partFKs )
606+ {
607+ ForeignKeyCacheInfo * fk = lfirst_node (ForeignKeyCacheInfo ,cell );
608+ Form_pg_constraint partConstr ;
609+ HeapTuple partcontup ;
610+
611+ attach_it = true;
612+
613+ /*
614+ * Do some quick & easy initial checks. If any of these fail, we
615+ * cannot use this constraint, but keep looking.
616+ */
617+ if (fk -> confrelid != constrForm -> confrelid || fk -> nkeys != nelem )
618+ {
619+ attach_it = false;
620+ continue ;
621+ }
622+ for (i = 0 ;i < nelem ;i ++ )
623+ {
624+ if (fk -> conkey [i ]!= mapped_conkey [i ]||
625+ fk -> confkey [i ]!= confkey [i ]||
626+ fk -> conpfeqop [i ]!= conpfeqop [i ])
627+ {
628+ attach_it = false;
629+ break ;
630+ }
631+ }
632+ if (!attach_it )
633+ continue ;
634+
635+ /*
636+ * Looks good so far; do some more extensive checks. Presumably
637+ * the check for 'convalidated' could be dropped, since we don't
638+ * really care about that, but let's be careful for now.
639+ */
640+ partcontup = SearchSysCache1 (CONSTROID ,
641+ ObjectIdGetDatum (fk -> conoid ));
642+ if (!partcontup )
643+ elog (ERROR ,"cache lookup failed for constraint %u" ,
644+ fk -> conoid );
645+ partConstr = (Form_pg_constraint )GETSTRUCT (partcontup );
646+ if (OidIsValid (partConstr -> conparentid )||
647+ !partConstr -> convalidated ||
648+ partConstr -> condeferrable != constrForm -> condeferrable ||
649+ partConstr -> condeferred != constrForm -> condeferred ||
650+ partConstr -> confupdtype != constrForm -> confupdtype ||
651+ partConstr -> confdeltype != constrForm -> confdeltype ||
652+ partConstr -> confmatchtype != constrForm -> confmatchtype )
653+ {
654+ ReleaseSysCache (partcontup );
655+ attach_it = false;
656+ continue ;
657+ }
658+
659+ ReleaseSysCache (partcontup );
660+
661+ /* looks good! Attach this constraint */
662+ ConstraintSetParentConstraint (fk -> conoid ,
663+ HeapTupleGetOid (tuple ));
664+ CommandCounterIncrement ();
665+ attach_it = true;
666+ break ;
667+ }
668+
669+ /*
670+ * If we attached to an existing constraint, there is no need to
671+ * create a new one. In fact, there's no need to recurse for this
672+ * constraint to partitions, either.
673+ */
674+ if (attach_it )
675+ {
676+ ReleaseSysCache (tuple );
677+ continue ;
678+ }
679+
542680constrOid =
543681CreateConstraintEntry (NameStr (constrForm -> conname ),
544682constrForm -> connamespace ,
@@ -547,7 +685,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
547685constrForm -> condeferred ,
548686constrForm -> convalidated ,
549687HeapTupleGetOid (tuple ),
550- relationId ,
688+ RelationGetRelid ( partRel ) ,
551689mapped_conkey ,
552690nelem ,
553691nelem ,
@@ -568,6 +706,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
568706NULL ,
569707 false,
5707081 , false, true);
709+ subclone = lappend_oid (subclone ,constrOid );
571710
572711ObjectAddressSet (childAddr ,ConstraintRelationId ,constrOid );
573712recordDependencyOn (& childAddr ,& parentAddr ,DEPENDENCY_INTERNAL_AUTO );
@@ -580,43 +719,56 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
580719fkconstraint -> deferrable = constrForm -> condeferrable ;
581720fkconstraint -> initdeferred = constrForm -> condeferred ;
582721
583- createForeignKeyTriggers (rel ,constrForm -> confrelid ,fkconstraint ,
722+ createForeignKeyTriggers (partRel ,constrForm -> confrelid ,fkconstraint ,
584723constrOid ,constrForm -> conindid , false);
585724
586725if (cloned )
587726{
727+ ClonedConstraint * newc ;
728+
588729/*
589730 * Feed back caller about the constraints we created, so that they
590731 * can set up constraint verification.
591732 */
592733newc = palloc (sizeof (ClonedConstraint ));
593- newc -> relid = relationId ;
734+ newc -> relid = RelationGetRelid ( partRel ) ;
594735newc -> refrelid = constrForm -> confrelid ;
595736newc -> conindid = constrForm -> conindid ;
596737newc -> conid = constrOid ;
597738newc -> constraint = fkconstraint ;
598739
599740* cloned = lappend (* cloned ,newc );
600741}
742+
743+ ReleaseSysCache (tuple );
601744}
602- systable_endscan (scan );
603745
604746pfree (attmap );
747+ list_free_deep (partFKs );
605748
606- if (rel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
749+ /*
750+ * If the partition is partitioned, recurse to handle any constraints that
751+ * were cloned.
752+ */
753+ if (partRel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE &&
754+ subclone != NIL )
607755{
608- PartitionDesc partdesc = RelationGetPartitionDesc (rel );
756+ PartitionDesc partdesc = RelationGetPartitionDesc (partRel );
609757int i ;
610758
611759for (i = 0 ;i < partdesc -> nparts ;i ++ )
612- CloneForeignKeyConstraints (RelationGetRelid (rel ),
613- partdesc -> oids [i ],
614- cloned );
760+ {
761+ Relation childRel ;
762+
763+ childRel = heap_open (partdesc -> oids [i ],AccessExclusiveLock );
764+ clone_fk_constraints (pg_constraint ,
765+ partRel ,
766+ childRel ,
767+ subclone ,
768+ cloned );
769+ heap_close (childRel ,NoLock );/* keep lock till commit */
770+ }
615771}
616-
617- heap_close (rel ,NoLock );/* keep lock till commit */
618- heap_close (parentRel ,NoLock );
619- heap_close (pg_constraint ,RowShareLock );
620772}
621773
622774/*
@@ -1028,17 +1180,33 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
10281180elog (ERROR ,"cache lookup failed for constraint %u" ,childConstrId );
10291181newtup = heap_copytuple (tuple );
10301182constrForm = (Form_pg_constraint )GETSTRUCT (newtup );
1031- constrForm -> conislocal = false;
1032- constrForm -> coninhcount ++ ;
1033- constrForm -> conparentid = parentConstrId ;
1034- CatalogTupleUpdate (constrRel ,& tuple -> t_self ,newtup );
1035- ReleaseSysCache (tuple );
1183+ if (OidIsValid (parentConstrId ))
1184+ {
1185+ constrForm -> conislocal = false;
1186+ constrForm -> coninhcount ++ ;
1187+ constrForm -> conparentid = parentConstrId ;
1188+
1189+ CatalogTupleUpdate (constrRel ,& tuple -> t_self ,newtup );
10361190
1037- ObjectAddressSet (referenced ,ConstraintRelationId ,parentConstrId );
1038- ObjectAddressSet (depender ,ConstraintRelationId ,childConstrId );
1191+ ObjectAddressSet (referenced ,ConstraintRelationId ,parentConstrId );
1192+ ObjectAddressSet (depender ,ConstraintRelationId ,childConstrId );
10391193
1040- recordDependencyOn (& depender ,& referenced ,DEPENDENCY_INTERNAL_AUTO );
1194+ recordDependencyOn (& depender ,& referenced ,DEPENDENCY_INTERNAL_AUTO );
1195+ }
1196+ else
1197+ {
1198+ constrForm -> coninhcount -- ;
1199+ if (constrForm -> coninhcount <=0 )
1200+ constrForm -> conislocal = true;
1201+ constrForm -> conparentid = InvalidOid ;
1202+
1203+ deleteDependencyRecordsForClass (ConstraintRelationId ,childConstrId ,
1204+ ConstraintRelationId ,
1205+ DEPENDENCY_INTERNAL_AUTO );
1206+ CatalogTupleUpdate (constrRel ,& tuple -> t_self ,newtup );
1207+ }
10411208
1209+ ReleaseSysCache (tuple );
10421210heap_close (constrRel ,RowExclusiveLock );
10431211}
10441212