Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit07cacba

Browse files
committed
Add the notion of REPLICA IDENTITY for a table.
Pending patches for logical replication will use this to determinewhich columns of a tuple ought to be considered as its candidate key.Andres Freund, with minor, mostly cosmetic adjustments by me
1 parentb97ee66 commit07cacba

File tree

23 files changed

+902
-49
lines changed

23 files changed

+902
-49
lines changed

‎doc/src/sgml/catalogs.sgml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,6 +1862,19 @@
18621862
relations other than some materialized views)</entry>
18631863
</row>
18641864

1865+
<row>
1866+
<entry><structfield>relreplident</structfield></entry>
1867+
<entry><type>char</type></entry>
1868+
<entry></entry>
1869+
<entry>
1870+
Columns used to form <quote>replica identity</> for rows:
1871+
<literal>d</> = default (primary key, if any),
1872+
<literal>n</> = nothing,
1873+
<literal>f</> = all columns
1874+
<literal>i</> = index with indisreplident set, or default
1875+
</entry>
1876+
</row>
1877+
18651878
<row>
18661879
<entry><structfield>relfrozenxid</structfield></entry>
18671880
<entry><type>xid</type></entry>
@@ -3657,6 +3670,17 @@
36573670
</entry>
36583671
</row>
36593672

3673+
<row>
3674+
<entry><structfield>indisreplident</structfield></entry>
3675+
<entry><type>bool</type></entry>
3676+
<entry></entry>
3677+
<entry>
3678+
If true this index has been chosen as <quote>replica identity</>
3679+
using <command>ALTER TABLE ... REPLICA IDENTITY USING INDEX
3680+
...</>
3681+
</entry>
3682+
</row>
3683+
36603684
<row>
36613685
<entry><structfield>indkey</structfield></entry>
36623686
<entry><type>int2vector</type></entry>

‎doc/src/sgml/ref/alter_table.sgml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable>
6969
NOT OF
7070
OWNER TO <replaceable class="PARAMETER">new_owner</replaceable>
7171
SET TABLESPACE <replaceable class="PARAMETER">new_tablespace</replaceable>
72+
REPLICA IDENTITY {DEFAULT | USING INDEX <replaceable class="PARAMETER">index_name</replaceable> | FULL | NOTHING}
7273

7374
<phrase>and <replaceable class="PARAMETER">table_constraint_using_index</replaceable> is:</phrase>
7475

@@ -579,6 +580,24 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable>
579580
</listitem>
580581
</varlistentry>
581582

583+
<varlistentry>
584+
<term><literal>REPLICA IDENTITY</literal></term>
585+
<listitem>
586+
<para>
587+
This form changes the information which is written to the write-ahead log
588+
to identify rows which are updated or deleted. This option has no effect
589+
except when logical replication is in use. <literal>DEFAULT</> records the
590+
old values of the columns of the primary key, if any. <literal>USING INDEX</>
591+
records the old values of the columns covered by the named index, which
592+
must be unique, not partial, not deferrable, and include only columns marked
593+
<literal>NOT NULL</>. <literal>FULL</> records the old values of all columns
594+
in the row. <literal>NOTHING</> records no information about the old row.
595+
In all cases, no old values are logged unless at least one of the columns
596+
that would be logged differs between the old and new versions of the row.
597+
</para>
598+
</listitem>
599+
</varlistentry>
600+
582601
<varlistentry>
583602
<term><literal>RENAME</literal></term>
584603
<listitem>

‎src/backend/catalog/heap.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,7 @@ InsertPgClassTuple(Relation pg_class_desc,
793793
values[Anum_pg_class_relhastriggers-1]=BoolGetDatum(rd_rel->relhastriggers);
794794
values[Anum_pg_class_relhassubclass-1]=BoolGetDatum(rd_rel->relhassubclass);
795795
values[Anum_pg_class_relispopulated-1]=BoolGetDatum(rd_rel->relispopulated);
796+
values[Anum_pg_class_relreplident-1]=CharGetDatum(rd_rel->relreplident);
796797
values[Anum_pg_class_relfrozenxid-1]=TransactionIdGetDatum(rd_rel->relfrozenxid);
797798
values[Anum_pg_class_relminmxid-1]=MultiXactIdGetDatum(rd_rel->relminmxid);
798799
if (relacl!= (Datum)0)

‎src/backend/catalog/index.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ UpdateIndexRelation(Oid indexoid,
614614
/* we set isvalid and isready the same way */
615615
values[Anum_pg_index_indisready-1]=BoolGetDatum(isvalid);
616616
values[Anum_pg_index_indislive-1]=BoolGetDatum(true);
617+
values[Anum_pg_index_indisreplident-1]=BoolGetDatum(false);
617618
values[Anum_pg_index_indkey-1]=PointerGetDatum(indkey);
618619
values[Anum_pg_index_indcollation-1]=PointerGetDatum(indcollation);
619620
values[Anum_pg_index_indclass-1]=PointerGetDatum(indclass);

‎src/backend/commands/tablecmds.c

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
399399
staticvoiddrop_parent_dependency(Oidrelid,Oidrefclassid,Oidrefobjid);
400400
staticvoidATExecAddOf(Relationrel,constTypeName*ofTypename,LOCKMODElockmode);
401401
staticvoidATExecDropOf(Relationrel,LOCKMODElockmode);
402+
staticvoidATExecReplicaIdentity(Relationrel,ReplicaIdentityStmt*stmt,LOCKMODElockmode);
402403
staticvoidATExecGenericOptions(Relationrel,List*options);
403404

404405
staticvoidcopy_relation_data(SMgrRelationrel,SMgrRelationdst,
@@ -2809,6 +2810,7 @@ AlterTableGetLockLevel(List *cmds)
28092810
caseAT_DisableTrigUser:
28102811
caseAT_AddIndex:/* from ADD CONSTRAINT */
28112812
caseAT_AddIndexConstraint:
2813+
caseAT_ReplicaIdentity:
28122814
cmd_lockmode=ShareRowExclusiveLock;
28132815
break;
28142816

@@ -3140,6 +3142,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
31403142
cmd->subtype=AT_ValidateConstraintRecurse;
31413143
pass=AT_PASS_MISC;
31423144
break;
3145+
caseAT_ReplicaIdentity:/* REPLICA IDENTITY ... */
3146+
ATSimplePermissions(rel,ATT_TABLE |ATT_MATVIEW);
3147+
pass=AT_PASS_MISC;
3148+
/* This command never recurses */
3149+
/* No command-specific prep needed */
3150+
break;
31433151
caseAT_EnableTrig:/* ENABLE TRIGGER variants */
31443152
caseAT_EnableAlwaysTrig:
31453153
caseAT_EnableReplicaTrig:
@@ -3440,6 +3448,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
34403448
caseAT_DropOf:
34413449
ATExecDropOf(rel,lockmode);
34423450
break;
3451+
caseAT_ReplicaIdentity:
3452+
ATExecReplicaIdentity(rel, (ReplicaIdentityStmt*)cmd->def,lockmode);
3453+
break;
34433454
caseAT_GenericOptions:
34443455
ATExecGenericOptions(rel, (List*)cmd->def);
34453456
break;
@@ -10009,6 +10020,217 @@ ATExecDropOf(Relation rel, LOCKMODE lockmode)
1000910020
heap_close(relationRelation,RowExclusiveLock);
1001010021
}
1001110022

10023+
/*
10024+
* relation_mark_replica_identity: Update a table's replica identity
10025+
*
10026+
* Iff ri_type = REPLICA_IDENTITY_INDEX, indexOid must be the Oid of a suitable
10027+
* index. Otherwise, it should be InvalidOid.
10028+
*/
10029+
staticvoid
10030+
relation_mark_replica_identity(Relationrel,charri_type,OidindexOid,
10031+
boolis_internal)
10032+
{
10033+
Relationpg_index;
10034+
Relationpg_class;
10035+
HeapTuplepg_class_tuple;
10036+
HeapTuplepg_index_tuple;
10037+
Form_pg_classpg_class_form;
10038+
Form_pg_indexpg_index_form;
10039+
10040+
ListCell*index;
10041+
10042+
/*
10043+
* Check whether relreplident has changed, and update it if so.
10044+
*/
10045+
pg_class=heap_open(RelationRelationId,RowExclusiveLock);
10046+
pg_class_tuple=SearchSysCacheCopy1(RELOID,
10047+
ObjectIdGetDatum(RelationGetRelid(rel)));
10048+
if (!HeapTupleIsValid(pg_class_tuple))
10049+
elog(ERROR,"cache lookup failed for relation \"%s\"",
10050+
RelationGetRelationName(rel));
10051+
pg_class_form= (Form_pg_class)GETSTRUCT(pg_class_tuple);
10052+
if (pg_class_form->relreplident!=ri_type)
10053+
{
10054+
pg_class_form->relreplident=ri_type;
10055+
simple_heap_update(pg_class,&pg_class_tuple->t_self,pg_class_tuple);
10056+
CatalogUpdateIndexes(pg_class,pg_class_tuple);
10057+
}
10058+
heap_close(pg_class,RowExclusiveLock);
10059+
heap_freetuple(pg_class_tuple);
10060+
10061+
/*
10062+
* Check whether the correct index is marked indisreplident; if so, we're
10063+
* done.
10064+
*/
10065+
if (OidIsValid(indexOid))
10066+
{
10067+
Assert(ri_type==REPLICA_IDENTITY_INDEX);
10068+
10069+
pg_index_tuple=SearchSysCache1(INDEXRELID,ObjectIdGetDatum(indexOid));
10070+
if (!HeapTupleIsValid(pg_index_tuple))
10071+
elog(ERROR,"cache lookup failed for index %u",indexOid);
10072+
pg_index_form= (Form_pg_index)GETSTRUCT(pg_index_tuple);
10073+
10074+
if (pg_index_form->indisreplident)
10075+
{
10076+
ReleaseSysCache(pg_index_tuple);
10077+
return;
10078+
}
10079+
ReleaseSysCache(pg_index_tuple);
10080+
}
10081+
10082+
/*
10083+
* Clear the indisreplident flag from any index that had it previously, and
10084+
* set it for any index that should have it now.
10085+
*/
10086+
pg_index=heap_open(IndexRelationId,RowExclusiveLock);
10087+
foreach(index,RelationGetIndexList(rel))
10088+
{
10089+
OidthisIndexOid=lfirst_oid(index);
10090+
booldirty= false;
10091+
10092+
pg_index_tuple=SearchSysCacheCopy1(INDEXRELID,
10093+
ObjectIdGetDatum(thisIndexOid));
10094+
if (!HeapTupleIsValid(pg_index_tuple))
10095+
elog(ERROR,"cache lookup failed for index %u",thisIndexOid);
10096+
pg_index_form= (Form_pg_index)GETSTRUCT(pg_index_tuple);
10097+
10098+
/*
10099+
* Unset the bit if set. We know it's wrong because we checked this
10100+
* earlier.
10101+
*/
10102+
if (pg_index_form->indisreplident)
10103+
{
10104+
dirty= true;
10105+
pg_index_form->indisreplident= false;
10106+
}
10107+
elseif (thisIndexOid==indexOid)
10108+
{
10109+
dirty= true;
10110+
pg_index_form->indisreplident= true;
10111+
}
10112+
10113+
if (dirty)
10114+
{
10115+
simple_heap_update(pg_index,&pg_index_tuple->t_self,pg_index_tuple);
10116+
CatalogUpdateIndexes(pg_index,pg_index_tuple);
10117+
InvokeObjectPostAlterHookArg(IndexRelationId,thisIndexOid,0,
10118+
InvalidOid,is_internal);
10119+
}
10120+
heap_freetuple(pg_index_tuple);
10121+
}
10122+
10123+
heap_close(pg_index,RowExclusiveLock);
10124+
}
10125+
10126+
/*
10127+
* ALTER TABLE <name> REPLICA IDENTITY ...
10128+
*/
10129+
staticvoid
10130+
ATExecReplicaIdentity(Relationrel,ReplicaIdentityStmt*stmt,LOCKMODElockmode)
10131+
{
10132+
OidindexOid;
10133+
RelationindexRel;
10134+
intkey;
10135+
10136+
if (stmt->identity_type==REPLICA_IDENTITY_DEFAULT)
10137+
{
10138+
relation_mark_replica_identity(rel,stmt->identity_type,InvalidOid, true);
10139+
return;
10140+
}
10141+
elseif (stmt->identity_type==REPLICA_IDENTITY_FULL)
10142+
{
10143+
relation_mark_replica_identity(rel,stmt->identity_type,InvalidOid, true);
10144+
return;
10145+
}
10146+
elseif (stmt->identity_type==REPLICA_IDENTITY_NOTHING)
10147+
{
10148+
relation_mark_replica_identity(rel,stmt->identity_type,InvalidOid, true);
10149+
return;
10150+
}
10151+
elseif (stmt->identity_type==REPLICA_IDENTITY_INDEX)
10152+
{
10153+
/* fallthrough */;
10154+
}
10155+
else
10156+
elog(ERROR,"unexpected identity type %u",stmt->identity_type);
10157+
10158+
10159+
/* Check that the index exists */
10160+
indexOid=get_relname_relid(stmt->name,rel->rd_rel->relnamespace);
10161+
if (!OidIsValid(indexOid))
10162+
ereport(ERROR,
10163+
(errcode(ERRCODE_UNDEFINED_OBJECT),
10164+
errmsg("index \"%s\" for table \"%s\" does not exist",
10165+
stmt->name,RelationGetRelationName(rel))));
10166+
10167+
indexRel=index_open(indexOid,ShareLock);
10168+
10169+
/* Check that the index is on the relation we're altering. */
10170+
if (indexRel->rd_index==NULL||
10171+
indexRel->rd_index->indrelid!=RelationGetRelid(rel))
10172+
ereport(ERROR,
10173+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
10174+
errmsg("\"%s\" is not an index for table \"%s\"",
10175+
RelationGetRelationName(indexRel),
10176+
RelationGetRelationName(rel))));
10177+
/* The AM must support uniqueness, and the index must in fact be unique. */
10178+
if (!indexRel->rd_am->amcanunique|| !indexRel->rd_index->indisunique)
10179+
ereport(ERROR,
10180+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
10181+
errmsg("cannot use non-unique index \"%s\" as replica identity",
10182+
RelationGetRelationName(indexRel))));
10183+
/* Deferred indexes are not guaranteed to be always unique. */
10184+
if (!indexRel->rd_index->indimmediate)
10185+
ereport(ERROR,
10186+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10187+
errmsg("cannot use non-immediate index \"%s\" as replica identity",
10188+
RelationGetRelationName(indexRel))));
10189+
/* Expression indexes aren't supported. */
10190+
if (RelationGetIndexExpressions(indexRel)!=NIL)
10191+
ereport(ERROR,
10192+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10193+
errmsg("cannot use expression index \"%s\" as replica identity",
10194+
RelationGetRelationName(indexRel))));
10195+
/* Predicate indexes aren't supported. */
10196+
if (RelationGetIndexPredicate(indexRel)!=NIL)
10197+
ereport(ERROR,
10198+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10199+
errmsg("cannot use partial index \"%s\" as replica identity",
10200+
RelationGetRelationName(indexRel))));
10201+
/* And neither are invalid indexes. */
10202+
if (!IndexIsValid(indexRel->rd_index))
10203+
ereport(ERROR,
10204+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10205+
errmsg("cannot use invalid index \"%s\" as replica identity",
10206+
RelationGetRelationName(indexRel))));
10207+
10208+
/* Check index for nullable columns. */
10209+
for (key=0;key<indexRel->rd_index->indnatts;key++)
10210+
{
10211+
int16attno=indexRel->rd_index->indkey.values[key];
10212+
Form_pg_attributeattr;
10213+
10214+
/* Of the system columns, only oid is indexable. */
10215+
if (attno <=0&&attno!=ObjectIdAttributeNumber)
10216+
elog(ERROR,"internal column %u in unique index \"%s\"",
10217+
attno,RelationGetRelationName(indexRel));
10218+
10219+
attr=rel->rd_att->attrs[attno-1];
10220+
if (!attr->attnotnull)
10221+
ereport(ERROR,
10222+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
10223+
errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
10224+
RelationGetRelationName(indexRel),
10225+
NameStr(attr->attname))));
10226+
}
10227+
10228+
/* This index is suitable for use as a replica identity. Mark it. */
10229+
relation_mark_replica_identity(rel,stmt->identity_type,indexOid, true);
10230+
10231+
index_close(indexRel,NoLock);
10232+
}
10233+
1001210234
/*
1001310235
* ALTER FOREIGN TABLE <name> OPTIONS (...)
1001410236
*/

‎src/backend/nodes/copyfuncs.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3270,6 +3270,17 @@ _copyRefreshMatViewStmt(const RefreshMatViewStmt *from)
32703270
returnnewnode;
32713271
}
32723272

3273+
staticReplicaIdentityStmt*
3274+
_copyReplicaIdentityStmt(constReplicaIdentityStmt*from)
3275+
{
3276+
ReplicaIdentityStmt*newnode=makeNode(ReplicaIdentityStmt);
3277+
3278+
COPY_SCALAR_FIELD(identity_type);
3279+
COPY_STRING_FIELD(name);
3280+
3281+
returnnewnode;
3282+
}
3283+
32733284
staticCreateSeqStmt*
32743285
_copyCreateSeqStmt(constCreateSeqStmt*from)
32753286
{
@@ -4343,6 +4354,9 @@ copyObject(const void *from)
43434354
caseT_RefreshMatViewStmt:
43444355
retval=_copyRefreshMatViewStmt(from);
43454356
break;
4357+
caseT_ReplicaIdentityStmt:
4358+
retval=_copyReplicaIdentityStmt(from);
4359+
break;
43464360
caseT_CreateSeqStmt:
43474361
retval=_copyCreateSeqStmt(from);
43484362
break;

‎src/backend/nodes/equalfuncs.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,15 @@ _equalRefreshMatViewStmt(const RefreshMatViewStmt *a, const RefreshMatViewStmt *
15371537
return true;
15381538
}
15391539

1540+
staticbool
1541+
_equalReplicaIdentityStmt(constReplicaIdentityStmt*a,constReplicaIdentityStmt*b)
1542+
{
1543+
COMPARE_SCALAR_FIELD(identity_type);
1544+
COMPARE_STRING_FIELD(name);
1545+
1546+
return true;
1547+
}
1548+
15401549
staticbool
15411550
_equalCreateSeqStmt(constCreateSeqStmt*a,constCreateSeqStmt*b)
15421551
{
@@ -2813,6 +2822,9 @@ equal(const void *a, const void *b)
28132822
caseT_RefreshMatViewStmt:
28142823
retval=_equalRefreshMatViewStmt(a,b);
28152824
break;
2825+
caseT_ReplicaIdentityStmt:
2826+
retval=_equalReplicaIdentityStmt(a,b);
2827+
break;
28162828
caseT_CreateSeqStmt:
28172829
retval=_equalCreateSeqStmt(a,b);
28182830
break;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp