@@ -313,8 +313,9 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
313
313
LOCKMODE lockmode );
314
314
static ObjectAddress ATExecAlterConstraint (Relation rel ,AlterTableCmd * cmd ,
315
315
bool recurse ,bool recursing ,LOCKMODE lockmode );
316
- static ObjectAddress ATExecValidateConstraint (Relation rel ,char * constrName ,
317
- bool recurse ,bool recursing ,LOCKMODE lockmode );
316
+ static ObjectAddress ATExecValidateConstraint (List * * wqueue ,Relation rel ,
317
+ char * constrName ,bool recurse ,bool recursing ,
318
+ LOCKMODE lockmode );
318
319
static int transformColumnNameList (Oid relId ,List * colList ,
319
320
int16 * attnums ,Oid * atttypids );
320
321
static int transformFkeyGetPrimaryKey (Relation pkrel ,Oid * indexOid ,
@@ -327,7 +328,6 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
327
328
static void checkFkeyPermissions (Relation rel ,int16 * attnums ,int natts );
328
329
static CoercionPathType findFkeyCast (Oid targetTypeId ,Oid sourceTypeId ,
329
330
Oid * funcid );
330
- static void validateCheckConstraint (Relation rel ,HeapTuple constrtup );
331
331
static void validateForeignKeyConstraint (char * conname ,
332
332
Relation rel ,Relation pkrel ,
333
333
Oid pkindOid ,Oid constraintOid );
@@ -3951,13 +3951,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
3951
3951
address = ATExecAlterConstraint (rel ,cmd , false, false,lockmode );
3952
3952
break ;
3953
3953
case AT_ValidateConstraint :/* VALIDATE CONSTRAINT */
3954
- address = ATExecValidateConstraint (rel ,cmd -> name , false , false,
3955
- lockmode );
3954
+ address = ATExecValidateConstraint (wqueue , rel ,cmd -> name , false,
3955
+ false, lockmode );
3956
3956
break ;
3957
3957
case AT_ValidateConstraintRecurse :/* VALIDATE CONSTRAINT with
3958
3958
* recursion */
3959
- address = ATExecValidateConstraint (rel ,cmd -> name , true, false ,
3960
- lockmode );
3959
+ address = ATExecValidateConstraint (wqueue , rel ,cmd -> name , true,
3960
+ false, lockmode );
3961
3961
break ;
3962
3962
case AT_DropConstraint :/* DROP CONSTRAINT */
3963
3963
ATExecDropConstraint (rel ,cmd -> name ,cmd -> behavior ,
@@ -7660,8 +7660,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
7660
7660
* was already validated, InvalidObjectAddress is returned.
7661
7661
*/
7662
7662
static ObjectAddress
7663
- ATExecValidateConstraint (Relation rel ,char * constrName , bool recurse ,
7664
- bool recursing ,LOCKMODE lockmode )
7663
+ ATExecValidateConstraint (List * * wqueue , Relation rel ,char * constrName ,
7664
+ bool recurse , bool recursing ,LOCKMODE lockmode )
7665
7665
{
7666
7666
Relation conrel ;
7667
7667
SysScanDesc scan ;
@@ -7708,27 +7708,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
7708
7708
7709
7709
if (!con -> convalidated )
7710
7710
{
7711
+ AlteredTableInfo * tab ;
7711
7712
HeapTuple copyTuple ;
7712
7713
Form_pg_constraint copy_con ;
7713
7714
7714
7715
if (con -> contype == CONSTRAINT_FOREIGN )
7715
7716
{
7716
- Relation refrel ;
7717
+ NewConstraint * newcon ;
7718
+ Constraint * fkconstraint ;
7717
7719
7718
- /*
7719
- * Triggers are already in place on both tables, so a concurrent
7720
- * write that alters the result here is not possible. Normally we
7721
- * can run a query here to do the validation, which would only
7722
- * require AccessShareLock. In some cases, it is possible that we
7723
- * might need to fire triggers to perform the check, so we take a
7724
- * lock at RowShareLock level just in case.
7725
- */
7726
- refrel = heap_open (con -> confrelid ,RowShareLock );
7720
+ /* Queue validation for phase 3 */
7721
+ fkconstraint = makeNode (Constraint );
7722
+ /* for now this is all we need */
7723
+ fkconstraint -> conname = constrName ;
7727
7724
7728
- validateForeignKeyConstraint (constrName ,rel ,refrel ,
7729
- con -> conindid ,
7730
- HeapTupleGetOid (tuple ));
7731
- heap_close (refrel ,NoLock );
7725
+ newcon = (NewConstraint * )palloc0 (sizeof (NewConstraint ));
7726
+ newcon -> name = constrName ;
7727
+ newcon -> contype = CONSTR_FOREIGN ;
7728
+ newcon -> refrelid = con -> confrelid ;
7729
+ newcon -> refindid = con -> conindid ;
7730
+ newcon -> conid = HeapTupleGetOid (tuple );
7731
+ newcon -> qual = (Node * )fkconstraint ;
7732
+
7733
+ /* Find or create work queue entry for this table */
7734
+ tab = ATGetQueueEntry (wqueue ,rel );
7735
+ tab -> constraints = lappend (tab -> constraints ,newcon );
7732
7736
7733
7737
/*
7734
7738
* Foreign keys do not inherit, so we purposely ignore the
@@ -7739,6 +7743,10 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
7739
7743
{
7740
7744
List * children = NIL ;
7741
7745
ListCell * child ;
7746
+ NewConstraint * newcon ;
7747
+ bool isnull ;
7748
+ Datum val ;
7749
+ char * conbin ;
7742
7750
7743
7751
/*
7744
7752
* If we're recursing, the parent has already done this, so skip
@@ -7778,12 +7786,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
7778
7786
/* find_all_inheritors already got lock */
7779
7787
childrel = heap_open (childoid ,NoLock );
7780
7788
7781
- ATExecValidateConstraint (childrel ,constrName , false,
7789
+ ATExecValidateConstraint (wqueue , childrel ,constrName , false,
7782
7790
true,lockmode );
7783
7791
heap_close (childrel ,NoLock );
7784
7792
}
7785
7793
7786
- validateCheckConstraint (rel ,tuple );
7794
+ /* Queue validation for phase 3 */
7795
+ newcon = (NewConstraint * )palloc0 (sizeof (NewConstraint ));
7796
+ newcon -> name = constrName ;
7797
+ newcon -> contype = CONSTR_CHECK ;
7798
+ newcon -> refrelid = InvalidOid ;
7799
+ newcon -> refindid = InvalidOid ;
7800
+ newcon -> conid = HeapTupleGetOid (tuple );
7801
+
7802
+ val = SysCacheGetAttr (CONSTROID ,tuple ,
7803
+ Anum_pg_constraint_conbin ,& isnull );
7804
+ if (isnull )
7805
+ elog (ERROR ,"null conbin for constraint %u" ,
7806
+ HeapTupleGetOid (tuple ));
7807
+
7808
+ conbin = TextDatumGetCString (val );
7809
+ newcon -> qual = (Node * )stringToNode (conbin );
7810
+
7811
+ /* Find or create work queue entry for this table */
7812
+ tab = ATGetQueueEntry (wqueue ,rel );
7813
+ tab -> constraints = lappend (tab -> constraints ,newcon );
7787
7814
7788
7815
/*
7789
7816
* Invalidate relcache so that others see the new validated
@@ -8159,91 +8186,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
8159
8186
}
8160
8187
}
8161
8188
8162
- /*
8163
- * Scan the existing rows in a table to verify they meet a proposed
8164
- * CHECK constraint.
8165
- *
8166
- * The caller must have opened and locked the relation appropriately.
8167
- */
8168
- static void
8169
- validateCheckConstraint (Relation rel ,HeapTuple constrtup )
8170
- {
8171
- EState * estate ;
8172
- Datum val ;
8173
- char * conbin ;
8174
- Expr * origexpr ;
8175
- ExprState * exprstate ;
8176
- TupleDesc tupdesc ;
8177
- HeapScanDesc scan ;
8178
- HeapTuple tuple ;
8179
- ExprContext * econtext ;
8180
- MemoryContext oldcxt ;
8181
- TupleTableSlot * slot ;
8182
- Form_pg_constraint constrForm ;
8183
- bool isnull ;
8184
- Snapshot snapshot ;
8185
-
8186
- /*
8187
- * VALIDATE CONSTRAINT is a no-op for foreign tables and partitioned
8188
- * tables.
8189
- */
8190
- if (rel -> rd_rel -> relkind == RELKIND_FOREIGN_TABLE ||
8191
- rel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
8192
- return ;
8193
-
8194
- constrForm = (Form_pg_constraint )GETSTRUCT (constrtup );
8195
-
8196
- estate = CreateExecutorState ();
8197
-
8198
- /*
8199
- * XXX this tuple doesn't really come from a syscache, but this doesn't
8200
- * matter to SysCacheGetAttr, because it only wants to be able to fetch
8201
- * the tupdesc
8202
- */
8203
- val = SysCacheGetAttr (CONSTROID ,constrtup ,Anum_pg_constraint_conbin ,
8204
- & isnull );
8205
- if (isnull )
8206
- elog (ERROR ,"null conbin for constraint %u" ,
8207
- HeapTupleGetOid (constrtup ));
8208
- conbin = TextDatumGetCString (val );
8209
- origexpr = (Expr * )stringToNode (conbin );
8210
- exprstate = ExecPrepareExpr (origexpr ,estate );
8211
-
8212
- econtext = GetPerTupleExprContext (estate );
8213
- tupdesc = RelationGetDescr (rel );
8214
- slot = MakeSingleTupleTableSlot (tupdesc );
8215
- econtext -> ecxt_scantuple = slot ;
8216
-
8217
- snapshot = RegisterSnapshot (GetLatestSnapshot ());
8218
- scan = heap_beginscan (rel ,snapshot ,0 ,NULL );
8219
-
8220
- /*
8221
- * Switch to per-tuple memory context and reset it for each tuple
8222
- * produced, so we don't leak memory.
8223
- */
8224
- oldcxt = MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
8225
-
8226
- while ((tuple = heap_getnext (scan ,ForwardScanDirection ))!= NULL )
8227
- {
8228
- ExecStoreTuple (tuple ,slot ,InvalidBuffer , false);
8229
-
8230
- if (!ExecCheck (exprstate ,econtext ))
8231
- ereport (ERROR ,
8232
- (errcode (ERRCODE_CHECK_VIOLATION ),
8233
- errmsg ("check constraint \"%s\" is violated by some row" ,
8234
- NameStr (constrForm -> conname )),
8235
- errtableconstraint (rel ,NameStr (constrForm -> conname ))));
8236
-
8237
- ResetExprContext (econtext );
8238
- }
8239
-
8240
- MemoryContextSwitchTo (oldcxt );
8241
- heap_endscan (scan );
8242
- UnregisterSnapshot (snapshot );
8243
- ExecDropSingleTupleTableSlot (slot );
8244
- FreeExecutorState (estate );
8245
- }
8246
-
8247
8189
/*
8248
8190
* Scan the existing rows in a table to verify they meet a proposed FK
8249
8191
* constraint.