Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3de241d

Browse files
committed
Foreign keys on partitioned tables
Author: Álvaro HerreraDiscussion:https://postgr.es/m/20171231194359.cvojcour423ulha4@alvherre.pgsqlReviewed-by: Peter Eisentraut
1 parent857f9c3 commit3de241d

File tree

17 files changed

+895
-109
lines changed

17 files changed

+895
-109
lines changed

‎doc/src/sgml/ref/alter_table.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
368368
specified check constraints). But the
369369
database will not assume that the constraint holds for all rows in
370370
the table, until it is validated by using the <literal>VALIDATE
371-
CONSTRAINT</literal> option.
371+
CONSTRAINT</literal> option. Foreign key constraints on partitioned
372+
tables may not be declared <literal>NOT VALID</literal> at present.
372373
</para>
373374

374375
<para>

‎doc/src/sgml/ref/create_table.sgml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,9 +546,12 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
546546
</para>
547547

548548
<para>
549-
Partitioned tables do not support <literal>EXCLUDE</literal> or
550-
<literal>FOREIGN KEY</literal> constraints; however, you can define
551-
these constraints on individual partitions.
549+
Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
550+
however, you can define these constraints on individual partitions.
551+
Also, while it's possible to define <literal>PRIMARY KEY</literal>
552+
constraints on partitioned tables, it is not supported to create foreign
553+
keys cannot that reference them. This restriction will be lifted in a
554+
future release.
552555
</para>
553556

554557
</listitem>
@@ -907,7 +910,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
907910
must have <literal>REFERENCES</literal> permission on the referenced table
908911
(either the whole table, or the specific referenced columns).
909912
Note that foreign key constraints cannot be defined between temporary
910-
tables and permanent tables.
913+
tables and permanent tables. Also note that while it is possible to
914+
define a foreign key on a partitioned table, it is not possible to
915+
declare a foreign key that references a partitioned table.
911916
</para>
912917

913918
<para>

‎src/backend/catalog/pg_constraint.c

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include"catalog/pg_operator.h"
2727
#include"catalog/pg_type.h"
2828
#include"commands/defrem.h"
29+
#include"commands/tablecmds.h"
2930
#include"utils/array.h"
3031
#include"utils/builtins.h"
3132
#include"utils/fmgroids.h"
@@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
377378
returnconOid;
378379
}
379380

381+
/*
382+
* CloneForeignKeyConstraints
383+
*Clone foreign keys from a partitioned table to a newly acquired
384+
*partition.
385+
*
386+
* relationId is a partition of parentId, so we can be certain that it has the
387+
* same columns with the same datatypes. The columns may be in different
388+
* order, though.
389+
*
390+
* The *cloned list is appended ClonedConstraint elements describing what was
391+
* created.
392+
*/
393+
void
394+
CloneForeignKeyConstraints(OidparentId,OidrelationId,List**cloned)
395+
{
396+
Relationpg_constraint;
397+
RelationparentRel;
398+
Relationrel;
399+
ScanKeyDatakey;
400+
SysScanDescscan;
401+
TupleDesctupdesc;
402+
HeapTupletuple;
403+
AttrNumber*attmap;
404+
405+
parentRel=heap_open(parentId,NoLock);/* already got lock */
406+
/* see ATAddForeignKeyConstraint about lock level */
407+
rel=heap_open(relationId,AccessExclusiveLock);
408+
409+
pg_constraint=heap_open(ConstraintRelationId,RowShareLock);
410+
tupdesc=RelationGetDescr(pg_constraint);
411+
412+
/*
413+
* The constraint key may differ, if the columns in the partition are
414+
* different. This map is used to convert them.
415+
*/
416+
attmap=convert_tuples_by_name_map(RelationGetDescr(rel),
417+
RelationGetDescr(parentRel),
418+
gettext_noop("could not convert row type"));
419+
420+
ScanKeyInit(&key,
421+
Anum_pg_constraint_conrelid,BTEqualStrategyNumber,
422+
F_OIDEQ,ObjectIdGetDatum(parentId));
423+
scan=systable_beginscan(pg_constraint,ConstraintRelidIndexId, true,
424+
NULL,1,&key);
425+
426+
while ((tuple=systable_getnext(scan))!=NULL)
427+
{
428+
Form_pg_constraintconstrForm= (Form_pg_constraint)GETSTRUCT(tuple);
429+
AttrNumberconkey[INDEX_MAX_KEYS];
430+
AttrNumbermapped_conkey[INDEX_MAX_KEYS];
431+
AttrNumberconfkey[INDEX_MAX_KEYS];
432+
Oidconpfeqop[INDEX_MAX_KEYS];
433+
Oidconppeqop[INDEX_MAX_KEYS];
434+
Oidconffeqop[INDEX_MAX_KEYS];
435+
Constraint*fkconstraint;
436+
ClonedConstraint*newc;
437+
OidconstrOid;
438+
ObjectAddressparentAddr,
439+
childAddr;
440+
intnelem;
441+
inti;
442+
ArrayType*arr;
443+
Datumdatum;
444+
boolisnull;
445+
446+
/* only foreign keys */
447+
if (constrForm->contype!=CONSTRAINT_FOREIGN)
448+
continue;
449+
450+
ObjectAddressSet(parentAddr,ConstraintRelationId,
451+
HeapTupleGetOid(tuple));
452+
453+
datum=fastgetattr(tuple,Anum_pg_constraint_conkey,
454+
tupdesc,&isnull);
455+
if (isnull)
456+
elog(ERROR,"null conkey");
457+
arr=DatumGetArrayTypeP(datum);
458+
nelem=ARR_DIMS(arr)[0];
459+
if (ARR_NDIM(arr)!=1||
460+
nelem<1||
461+
nelem>INDEX_MAX_KEYS||
462+
ARR_HASNULL(arr)||
463+
ARR_ELEMTYPE(arr)!=INT2OID)
464+
elog(ERROR,"conkey is not a 1-D smallint array");
465+
memcpy(conkey,ARR_DATA_PTR(arr),nelem*sizeof(AttrNumber));
466+
467+
for (i=0;i<nelem;i++)
468+
mapped_conkey[i]=attmap[conkey[i]-1];
469+
470+
datum=fastgetattr(tuple,Anum_pg_constraint_confkey,
471+
tupdesc,&isnull);
472+
if (isnull)
473+
elog(ERROR,"null confkey");
474+
arr=DatumGetArrayTypeP(datum);
475+
nelem=ARR_DIMS(arr)[0];
476+
if (ARR_NDIM(arr)!=1||
477+
nelem<1||
478+
nelem>INDEX_MAX_KEYS||
479+
ARR_HASNULL(arr)||
480+
ARR_ELEMTYPE(arr)!=INT2OID)
481+
elog(ERROR,"confkey is not a 1-D smallint array");
482+
memcpy(confkey,ARR_DATA_PTR(arr),nelem*sizeof(AttrNumber));
483+
484+
datum=fastgetattr(tuple,Anum_pg_constraint_conpfeqop,
485+
tupdesc,&isnull);
486+
if (isnull)
487+
elog(ERROR,"null conpfeqop");
488+
arr=DatumGetArrayTypeP(datum);
489+
nelem=ARR_DIMS(arr)[0];
490+
if (ARR_NDIM(arr)!=1||
491+
nelem<1||
492+
nelem>INDEX_MAX_KEYS||
493+
ARR_HASNULL(arr)||
494+
ARR_ELEMTYPE(arr)!=OIDOID)
495+
elog(ERROR,"conpfeqop is not a 1-D OID array");
496+
memcpy(conpfeqop,ARR_DATA_PTR(arr),nelem*sizeof(Oid));
497+
498+
datum=fastgetattr(tuple,Anum_pg_constraint_conpfeqop,
499+
tupdesc,&isnull);
500+
if (isnull)
501+
elog(ERROR,"null conpfeqop");
502+
arr=DatumGetArrayTypeP(datum);
503+
nelem=ARR_DIMS(arr)[0];
504+
if (ARR_NDIM(arr)!=1||
505+
nelem<1||
506+
nelem>INDEX_MAX_KEYS||
507+
ARR_HASNULL(arr)||
508+
ARR_ELEMTYPE(arr)!=OIDOID)
509+
elog(ERROR,"conpfeqop is not a 1-D OID array");
510+
memcpy(conpfeqop,ARR_DATA_PTR(arr),nelem*sizeof(Oid));
511+
512+
datum=fastgetattr(tuple,Anum_pg_constraint_conppeqop,
513+
tupdesc,&isnull);
514+
if (isnull)
515+
elog(ERROR,"null conppeqop");
516+
arr=DatumGetArrayTypeP(datum);
517+
nelem=ARR_DIMS(arr)[0];
518+
if (ARR_NDIM(arr)!=1||
519+
nelem<1||
520+
nelem>INDEX_MAX_KEYS||
521+
ARR_HASNULL(arr)||
522+
ARR_ELEMTYPE(arr)!=OIDOID)
523+
elog(ERROR,"conppeqop is not a 1-D OID array");
524+
memcpy(conppeqop,ARR_DATA_PTR(arr),nelem*sizeof(Oid));
525+
526+
datum=fastgetattr(tuple,Anum_pg_constraint_conffeqop,
527+
tupdesc,&isnull);
528+
if (isnull)
529+
elog(ERROR,"null conffeqop");
530+
arr=DatumGetArrayTypeP(datum);
531+
nelem=ARR_DIMS(arr)[0];
532+
if (ARR_NDIM(arr)!=1||
533+
nelem<1||
534+
nelem>INDEX_MAX_KEYS||
535+
ARR_HASNULL(arr)||
536+
ARR_ELEMTYPE(arr)!=OIDOID)
537+
elog(ERROR,"conffeqop is not a 1-D OID array");
538+
memcpy(conffeqop,ARR_DATA_PTR(arr),nelem*sizeof(Oid));
539+
540+
constrOid=
541+
CreateConstraintEntry(NameStr(constrForm->conname),
542+
constrForm->connamespace,
543+
CONSTRAINT_FOREIGN,
544+
constrForm->condeferrable,
545+
constrForm->condeferred,
546+
constrForm->convalidated,
547+
HeapTupleGetOid(tuple),
548+
relationId,
549+
mapped_conkey,
550+
nelem,
551+
InvalidOid,/* not a domain constraint */
552+
constrForm->conindid,/* same index */
553+
constrForm->confrelid,/* same foreign rel */
554+
confkey,
555+
conpfeqop,
556+
conppeqop,
557+
conffeqop,
558+
nelem,
559+
constrForm->confupdtype,
560+
constrForm->confdeltype,
561+
constrForm->confmatchtype,
562+
NULL,
563+
NULL,
564+
NULL,
565+
NULL,
566+
false,
567+
1, false, true);
568+
569+
ObjectAddressSet(childAddr,ConstraintRelationId,constrOid);
570+
recordDependencyOn(&childAddr,&parentAddr,DEPENDENCY_INTERNAL_AUTO);
571+
572+
fkconstraint=makeNode(Constraint);
573+
/* for now this is all we need */
574+
fkconstraint->fk_upd_action=constrForm->confupdtype;
575+
fkconstraint->fk_del_action=constrForm->confdeltype;
576+
fkconstraint->deferrable=constrForm->condeferrable;
577+
fkconstraint->initdeferred=constrForm->condeferred;
578+
579+
createForeignKeyTriggers(rel,constrForm->confrelid,fkconstraint,
580+
constrOid,constrForm->conindid, false);
581+
582+
if (cloned)
583+
{
584+
/*
585+
* Feed back caller about the constraints we created, so that they can
586+
* set up constraint verification.
587+
*/
588+
newc=palloc(sizeof(ClonedConstraint));
589+
newc->relid=relationId;
590+
newc->refrelid=constrForm->confrelid;
591+
newc->conindid=constrForm->conindid;
592+
newc->conid=constrOid;
593+
newc->constraint=fkconstraint;
594+
595+
*cloned=lappend(*cloned,newc);
596+
}
597+
}
598+
systable_endscan(scan);
599+
600+
pfree(attmap);
601+
602+
if (rel->rd_rel->relkind==RELKIND_PARTITIONED_TABLE)
603+
{
604+
PartitionDescpartdesc=RelationGetPartitionDesc(rel);
605+
inti;
606+
607+
for (i=0;i<partdesc->nparts;i++)
608+
CloneForeignKeyConstraints(RelationGetRelid(rel),
609+
partdesc->oids[i],
610+
cloned);
611+
}
612+
613+
heap_close(rel,NoLock);/* keep lock till commit */
614+
heap_close(parentRel,NoLock);
615+
heap_close(pg_constraint,RowShareLock);
616+
}
380617

381618
/*
382619
* Test whether given name is currently used as a constraint name

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp