@@ -574,8 +574,9 @@ static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
574
574
Oid indexOid,
575
575
Oid parentDelTrigger, Oid parentUpdTrigger,
576
576
Oid *deleteTrigOid, Oid *updateTrigOid);
577
- static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
578
- Oid partRelid,
577
+ static bool tryAttachPartitionForeignKey(List **wqueue,
578
+ ForeignKeyCacheInfo *fk,
579
+ Relation partition,
579
580
Oid parentConstrOid, int numfks,
580
581
AttrNumber *mapped_conkey, AttrNumber *confkey,
581
582
Oid *conpfeqop,
@@ -9772,22 +9773,12 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
9772
9773
* Validity checks (permission checks wait till we have the column
9773
9774
* numbers)
9774
9775
*/
9775
- if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
9776
- {
9777
- if (!recurse)
9778
- ereport(ERROR,
9779
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
9780
- errmsg("cannot use ONLY for foreign key on partitioned table \"%s\" referencing relation \"%s\"",
9781
- RelationGetRelationName(rel),
9782
- RelationGetRelationName(pkrel))));
9783
- if (fkconstraint->skip_validation && !fkconstraint->initially_valid)
9784
- ereport(ERROR,
9785
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
9786
- errmsg("cannot add NOT VALID foreign key on partitioned table \"%s\" referencing relation \"%s\"",
9787
- RelationGetRelationName(rel),
9788
- RelationGetRelationName(pkrel)),
9789
- errdetail("This feature is not yet supported on partitioned tables.")));
9790
- }
9776
+ if (!recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
9777
+ ereport(ERROR,
9778
+ errcode(ERRCODE_WRONG_OBJECT_TYPE),
9779
+ errmsg("cannot use ONLY for foreign key on partitioned table \"%s\" referencing relation \"%s\"",
9780
+ RelationGetRelationName(rel),
9781
+ RelationGetRelationName(pkrel)));
9791
9782
9792
9783
if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
9793
9784
pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
@@ -10782,14 +10773,12 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
10782
10773
*/
10783
10774
for (int i = 0; i < pd->nparts; i++)
10784
10775
{
10785
- OidpartitionId = pd->oids[i];
10786
- Relationpartition = table_open(partitionId, lockmode);
10776
+ Relationpartition = table_open(pd->oids[i], lockmode);
10787
10777
List *partFKs;
10788
10778
AttrMap *attmap;
10789
10779
AttrNumbermapped_fkattnum[INDEX_MAX_KEYS];
10790
10780
boolattached;
10791
10781
ObjectAddress address;
10792
- ListCell *cell;
10793
10782
10794
10783
CheckAlterTableIsSafe(partition);
10795
10784
@@ -10802,13 +10791,11 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
10802
10791
/* Check whether an existing constraint can be repurposed */
10803
10792
partFKs = copyObject(RelationGetFKeyList(partition));
10804
10793
attached = false;
10805
- foreach(cell , partFKs)
10794
+ foreach_node(ForeignKeyCacheInfo, fk , partFKs)
10806
10795
{
10807
- ForeignKeyCacheInfo *fk;
10808
-
10809
- fk = lfirst_node(ForeignKeyCacheInfo, cell);
10810
- if (tryAttachPartitionForeignKey(fk,
10811
- partitionId,
10796
+ if (tryAttachPartitionForeignKey(wqueue,
10797
+ fk,
10798
+ partition,
10812
10799
parentConstr,
10813
10800
numfks,
10814
10801
mapped_fkattnum,
@@ -11260,8 +11247,9 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
11260
11247
{
11261
11248
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, lc);
11262
11249
11263
- if (tryAttachPartitionForeignKey(fk,
11264
- RelationGetRelid(partRel),
11250
+ if (tryAttachPartitionForeignKey(wqueue,
11251
+ fk,
11252
+ partRel,
11265
11253
parentConstrOid,
11266
11254
numfks,
11267
11255
mapped_conkey,
@@ -11364,8 +11352,9 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
11364
11352
* return false.
11365
11353
*/
11366
11354
static bool
11367
- tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
11368
- Oid partRelid,
11355
+ tryAttachPartitionForeignKey(List **wqueue,
11356
+ ForeignKeyCacheInfo *fk,
11357
+ Relation partition,
11369
11358
Oid parentConstrOid,
11370
11359
int numfks,
11371
11360
AttrNumber *mapped_conkey,
@@ -11379,6 +11368,7 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
11379
11368
Form_pg_constraint parentConstr;
11380
11369
HeapTuplepartcontup;
11381
11370
Form_pg_constraint partConstr;
11371
+ boolqueueValidation;
11382
11372
ScanKeyData key;
11383
11373
SysScanDesc scan;
11384
11374
HeapTupletrigtup;
@@ -11411,18 +11401,12 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
11411
11401
}
11412
11402
}
11413
11403
11414
- /*
11415
- * Looks good so far; do some more extensive checks. Presumably the check
11416
- * for 'convalidated' could be dropped, since we don't really care about
11417
- * that, but let's be careful for now.
11418
- */
11419
- partcontup = SearchSysCache1(CONSTROID,
11420
- ObjectIdGetDatum(fk->conoid));
11404
+ /* Looks good so far; perform more extensive checks. */
11405
+ partcontup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid));
11421
11406
if (!HeapTupleIsValid(partcontup))
11422
11407
elog(ERROR, "cache lookup failed for constraint %u", fk->conoid);
11423
11408
partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
11424
11409
if (OidIsValid(partConstr->conparentid) ||
11425
- !partConstr->convalidated ||
11426
11410
partConstr->condeferrable != parentConstr->condeferrable ||
11427
11411
partConstr->condeferred != parentConstr->condeferred ||
11428
11412
partConstr->confupdtype != parentConstr->confupdtype ||
@@ -11434,6 +11418,13 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
11434
11418
return false;
11435
11419
}
11436
11420
11421
+ /*
11422
+ * Will we need to validate this constraint? A valid parent constraint
11423
+ * implies that all child constraints have been validated, so if this one
11424
+ * isn't, we must trigger phase 3 validation.
11425
+ */
11426
+ queueValidation = parentConstr->convalidated && !partConstr->convalidated;
11427
+
11437
11428
ReleaseSysCache(partcontup);
11438
11429
ReleaseSysCache(parentConstrTup);
11439
11430
@@ -11481,7 +11472,8 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
11481
11472
11482
11473
systable_endscan(scan);
11483
11474
11484
- ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid);
11475
+ ConstraintSetParentConstraint(fk->conoid, parentConstrOid,
11476
+ RelationGetRelid(partition));
11485
11477
11486
11478
/*
11487
11479
* Like the constraint, attach partition's "check" triggers to the
@@ -11492,10 +11484,10 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
11492
11484
&insertTriggerOid, &updateTriggerOid);
11493
11485
Assert(OidIsValid(insertTriggerOid) && OidIsValid(parentInsTrigger));
11494
11486
TriggerSetParentTrigger(trigrel, insertTriggerOid, parentInsTrigger,
11495
- partRelid );
11487
+ RelationGetRelid(partition) );
11496
11488
Assert(OidIsValid(updateTriggerOid) && OidIsValid(parentUpdTrigger));
11497
11489
TriggerSetParentTrigger(trigrel, updateTriggerOid, parentUpdTrigger,
11498
- partRelid );
11490
+ RelationGetRelid(partition) );
11499
11491
11500
11492
/*
11501
11493
* If the referenced table is partitioned, then the partition we're
@@ -11572,7 +11564,33 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
11572
11564
table_close(pg_constraint, RowShareLock);
11573
11565
}
11574
11566
11575
- CommandCounterIncrement();
11567
+ /* If validation is needed, put it in the queue now. */
11568
+ if (queueValidation)
11569
+ {
11570
+ Relationconrel;
11571
+
11572
+ /*
11573
+ * We updated this pg_constraint row above to set its parent;
11574
+ * validating it will cause its convalidated flag to change, so we
11575
+ * need CCI here. XXX it might work better to effect the convalidated
11576
+ * changes for all constraints together during phase 3, but that
11577
+ * requires more invasive code surgery.
11578
+ */
11579
+ CommandCounterIncrement();
11580
+
11581
+ conrel = table_open(ConstraintRelationId, RowExclusiveLock);
11582
+
11583
+ partcontup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid));
11584
+ if (!HeapTupleIsValid(partcontup))
11585
+ elog(ERROR, "cache lookup failed for constraint %u", fk->conoid);
11586
+
11587
+ /* Use the same lock as for AT_ValidateConstraint */
11588
+ QueueFKConstraintValidation(wqueue, conrel, partition, partcontup,
11589
+ ShareUpdateExclusiveLock);
11590
+ ReleaseSysCache(partcontup);
11591
+ table_close(conrel, RowExclusiveLock);
11592
+ }
11593
+
11576
11594
return true;
11577
11595
}
11578
11596
@@ -12113,7 +12131,7 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
12113
12131
*
12114
12132
* Add an entry to the wqueue to validate the given foreign key constraint in
12115
12133
* Phase 3 and update the convalidated field in the pg_constraint catalog
12116
- * for the specified relation.
12134
+ * for the specified relation and all its children .
12117
12135
*/
12118
12136
static void
12119
12137
QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
@@ -12126,6 +12144,7 @@ QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
12126
12144
12127
12145
con = (Form_pg_constraint) GETSTRUCT(contuple);
12128
12146
Assert(con->contype == CONSTRAINT_FOREIGN);
12147
+ Assert(!con->convalidated);
12129
12148
12130
12149
if (rel->rd_rel->relkind == RELKIND_RELATION)
12131
12150
{
@@ -12151,9 +12170,48 @@ QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
12151
12170
}
12152
12171
12153
12172
/*
12154
- *We disallow creating invalid foreign keys to or from partitioned
12155
- *tables, so ignoring the recursion bit isokay .
12173
+ *If the table at either end of the constraint is partitioned, we need to
12174
+ *recurse and handle every constraint that isa child of this constraint .
12156
12175
*/
12176
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ||
12177
+ get_rel_relkind(con->confrelid) == RELKIND_PARTITIONED_TABLE)
12178
+ {
12179
+ ScanKeyData pkey;
12180
+ SysScanDesc pscan;
12181
+ HeapTuplechildtup;
12182
+
12183
+ ScanKeyInit(&pkey,
12184
+ Anum_pg_constraint_conparentid,
12185
+ BTEqualStrategyNumber, F_OIDEQ,
12186
+ ObjectIdGetDatum(con->oid));
12187
+
12188
+ pscan = systable_beginscan(conrel, ConstraintParentIndexId,
12189
+ true, NULL, 1, &pkey);
12190
+
12191
+ while (HeapTupleIsValid(childtup = systable_getnext(pscan)))
12192
+ {
12193
+ Form_pg_constraint childcon;
12194
+ Relationchildrel;
12195
+
12196
+ childcon = (Form_pg_constraint) GETSTRUCT(childtup);
12197
+
12198
+ /*
12199
+ * If the child constraint has already been validated, no further
12200
+ * action is required for it or its descendants, as they are all
12201
+ * valid.
12202
+ */
12203
+ if (childcon->convalidated)
12204
+ continue;
12205
+
12206
+ childrel = table_open(childcon->conrelid, lockmode);
12207
+
12208
+ QueueFKConstraintValidation(wqueue, conrel, childrel, childtup,
12209
+ lockmode);
12210
+ table_close(childrel, NoLock);
12211
+ }
12212
+
12213
+ systable_endscan(pscan);
12214
+ }
12157
12215
12158
12216
/*
12159
12217
* Now update the catalog, while we have the door open.