@@ -328,7 +328,8 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
328
328
LOCKMODE lockmode);
329
329
static ObjectAddress ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
330
330
bool recurse, bool recursing, LOCKMODE lockmode);
331
- static ObjectAddress ATExecValidateConstraint(Relation rel, char *constrName,
331
+ static ObjectAddress ATExecValidateConstraint(List **wqueue,
332
+ Relation rel, char *constrName,
332
333
bool recurse, bool recursing, LOCKMODE lockmode);
333
334
static inttransformColumnNameList(Oid relId, List *colList,
334
335
int16 *attnums, Oid *atttypids);
@@ -342,7 +343,6 @@ static OidtransformFkeyCheckAttrs(Relation pkrel,
342
343
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
343
344
static CoercionPathType findFkeyCast(Oid targetTypeId, Oid sourceTypeId,
344
345
Oid *funcid);
345
- static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
346
346
static void validateForeignKeyConstraint(char *conname,
347
347
Relation rel, Relation pkrel,
348
348
Oid pkindOid, Oid constraintOid);
@@ -4500,13 +4500,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
4500
4500
address = ATExecAlterConstraint(rel, cmd, false, false, lockmode);
4501
4501
break;
4502
4502
case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
4503
- address = ATExecValidateConstraint(rel, cmd->name, false , false,
4504
- lockmode);
4503
+ address = ATExecValidateConstraint(wqueue, rel, cmd->name, false,
4504
+ false, lockmode);
4505
4505
break;
4506
4506
case AT_ValidateConstraintRecurse:/* VALIDATE CONSTRAINT with
4507
4507
* recursion */
4508
- address = ATExecValidateConstraint(rel, cmd->name, true, false ,
4509
- lockmode);
4508
+ address = ATExecValidateConstraint(wqueue, rel, cmd->name, true,
4509
+ false, lockmode);
4510
4510
break;
4511
4511
case AT_DropConstraint: /* DROP CONSTRAINT */
4512
4512
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
@@ -9727,8 +9727,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
9727
9727
* was already validated, InvalidObjectAddress is returned.
9728
9728
*/
9729
9729
static ObjectAddress
9730
- ATExecValidateConstraint(Relation rel, char *constrName, bool recurse ,
9731
- bool recursing, LOCKMODE lockmode)
9730
+ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
9731
+ boolrecurse, bool recursing, LOCKMODE lockmode)
9732
9732
{
9733
9733
Relationconrel;
9734
9734
SysScanDesc scan;
@@ -9774,27 +9774,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
9774
9774
9775
9775
if (!con->convalidated)
9776
9776
{
9777
+ AlteredTableInfo *tab;
9777
9778
HeapTuplecopyTuple;
9778
9779
Form_pg_constraint copy_con;
9779
9780
9780
9781
if (con->contype == CONSTRAINT_FOREIGN)
9781
9782
{
9782
- Relationrefrel;
9783
+ NewConstraint *newcon;
9784
+ Constraint *fkconstraint;
9783
9785
9784
- /*
9785
- * Triggers are already in place on both tables, so a concurrent
9786
- * write that alters the result here is not possible. Normally we
9787
- * can run a query here to do the validation, which would only
9788
- * require AccessShareLock. In some cases, it is possible that we
9789
- * might need to fire triggers to perform the check, so we take a
9790
- * lock at RowShareLock level just in case.
9791
- */
9792
- refrel = table_open(con->confrelid, RowShareLock);
9786
+ /* Queue validation for phase 3 */
9787
+ fkconstraint = makeNode(Constraint);
9788
+ /* for now this is all we need */
9789
+ fkconstraint->conname = constrName;
9793
9790
9794
- validateForeignKeyConstraint(constrName, rel, refrel,
9795
- con->conindid,
9796
- con->oid);
9797
- table_close(refrel, NoLock);
9791
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
9792
+ newcon->name = constrName;
9793
+ newcon->contype = CONSTR_FOREIGN;
9794
+ newcon->refrelid = con->confrelid;
9795
+ newcon->refindid = con->conindid;
9796
+ newcon->conid = con->oid;
9797
+ newcon->qual = (Node *) fkconstraint;
9798
+
9799
+ /* Find or create work queue entry for this table */
9800
+ tab = ATGetQueueEntry(wqueue, rel);
9801
+ tab->constraints = lappend(tab->constraints, newcon);
9798
9802
9799
9803
/*
9800
9804
* We disallow creating invalid foreign keys to or from
@@ -9805,6 +9809,10 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
9805
9809
{
9806
9810
List *children = NIL;
9807
9811
ListCell *child;
9812
+ NewConstraint *newcon;
9813
+ boolisnull;
9814
+ Datumval;
9815
+ char *conbin;
9808
9816
9809
9817
/*
9810
9818
* If we're recursing, the parent has already done this, so skip
@@ -9844,12 +9852,30 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
9844
9852
/* find_all_inheritors already got lock */
9845
9853
childrel = table_open(childoid, NoLock);
9846
9854
9847
- ATExecValidateConstraint(childrel, constrName, false,
9855
+ ATExecValidateConstraint(wqueue, childrel, constrName, false,
9848
9856
true, lockmode);
9849
9857
table_close(childrel, NoLock);
9850
9858
}
9851
9859
9852
- validateCheckConstraint(rel, tuple);
9860
+ /* Queue validation for phase 3 */
9861
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
9862
+ newcon->name = constrName;
9863
+ newcon->contype = CONSTR_CHECK;
9864
+ newcon->refrelid = InvalidOid;
9865
+ newcon->refindid = InvalidOid;
9866
+ newcon->conid = con->oid;
9867
+
9868
+ val = SysCacheGetAttr(CONSTROID, tuple,
9869
+ Anum_pg_constraint_conbin, &isnull);
9870
+ if (isnull)
9871
+ elog(ERROR, "null conbin for constraint %u", con->oid);
9872
+
9873
+ conbin = TextDatumGetCString(val);
9874
+ newcon->qual = (Node *) stringToNode(conbin);
9875
+
9876
+ /* Find or create work queue entry for this table */
9877
+ tab = ATGetQueueEntry(wqueue, rel);
9878
+ tab->constraints = lappend(tab->constraints, newcon);
9853
9879
9854
9880
/*
9855
9881
* Invalidate relcache so that others see the new validated
@@ -10223,87 +10249,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
10223
10249
}
10224
10250
}
10225
10251
10226
- /*
10227
- * Scan the existing rows in a table to verify they meet a proposed
10228
- * CHECK constraint.
10229
- *
10230
- * The caller must have opened and locked the relation appropriately.
10231
- */
10232
- static void
10233
- validateCheckConstraint(Relation rel, HeapTuple constrtup)
10234
- {
10235
- EState *estate;
10236
- Datumval;
10237
- char *conbin;
10238
- Expr *origexpr;
10239
- ExprState *exprstate;
10240
- TableScanDesc scan;
10241
- ExprContext *econtext;
10242
- MemoryContext oldcxt;
10243
- TupleTableSlot *slot;
10244
- Form_pg_constraint constrForm;
10245
- boolisnull;
10246
- Snapshotsnapshot;
10247
-
10248
- /*
10249
- * VALIDATE CONSTRAINT is a no-op for foreign tables and partitioned
10250
- * tables.
10251
- */
10252
- if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
10253
- rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
10254
- return;
10255
-
10256
- constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
10257
-
10258
- estate = CreateExecutorState();
10259
-
10260
- /*
10261
- * XXX this tuple doesn't really come from a syscache, but this doesn't
10262
- * matter to SysCacheGetAttr, because it only wants to be able to fetch
10263
- * the tupdesc
10264
- */
10265
- val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
10266
- &isnull);
10267
- if (isnull)
10268
- elog(ERROR, "null conbin for constraint %u",
10269
- constrForm->oid);
10270
- conbin = TextDatumGetCString(val);
10271
- origexpr = (Expr *) stringToNode(conbin);
10272
- exprstate = ExecPrepareExpr(origexpr, estate);
10273
-
10274
- econtext = GetPerTupleExprContext(estate);
10275
- slot = table_slot_create(rel, NULL);
10276
- econtext->ecxt_scantuple = slot;
10277
-
10278
- snapshot = RegisterSnapshot(GetLatestSnapshot());
10279
- scan = table_beginscan(rel, snapshot, 0, NULL);
10280
-
10281
- /*
10282
- * Switch to per-tuple memory context and reset it for each tuple
10283
- * produced, so we don't leak memory.
10284
- */
10285
- oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
10286
-
10287
- while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
10288
- {
10289
- if (!ExecCheck(exprstate, econtext))
10290
- ereport(ERROR,
10291
- (errcode(ERRCODE_CHECK_VIOLATION),
10292
- errmsg("check constraint \"%s\" of relation \"%s\" is violated by some row",
10293
- NameStr(constrForm->conname),
10294
- RelationGetRelationName(rel)),
10295
- errtableconstraint(rel, NameStr(constrForm->conname))));
10296
-
10297
- ResetExprContext(econtext);
10298
- }
10299
-
10300
- MemoryContextSwitchTo(oldcxt);
10301
- table_endscan(scan);
10302
- UnregisterSnapshot(snapshot);
10303
- ExecDropSingleTupleTableSlot(slot);
10304
- FreeExecutorState(estate);
10305
- }
10306
-
10307
10252
/*
10308
10253
* Scan the existing rows in a table to verify they meet a proposed FK
10309
10254
* constraint.