Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf1ac27b

Browse files
committed
Add logical replication support to replicate into partitioned tables
Mainly, this adds support code in logical/worker.c for applyingreplicated operations whose target is a partitioned table to itsrelevant partitions.Author: Amit Langote <amitlangote09@gmail.com>Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>Discussion:https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
1 parentb7ce6de commitf1ac27b

File tree

7 files changed

+636
-69
lines changed

7 files changed

+636
-69
lines changed

‎doc/src/sgml/logical-replication.sgml

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -402,16 +402,19 @@
402402

403403
<listitem>
404404
<para>
405-
Replication is only supported by tables, partitioned or not, although a
406-
given table must either be partitioned on both servers or not partitioned
407-
at all. Also, when replicating between partitioned tables, the actual
408-
replication occurs between leaf partitions, so partitions on the two
409-
servers must match one-to-one.
405+
Replication is only supported by tables, including partitioned tables.
406+
Attempts to replicate other types of relations such as views, materialized
407+
views, or foreign tables, will result in an error.
410408
</para>
409+
</listitem>
411410

411+
<listitem>
412412
<para>
413-
Attempts to replicate other types of relations such as views, materialized
414-
views, or foreign tables, will result in an error.
413+
When replicating between partitioned tables, the actual replication
414+
originates from the leaf partitions on the publisher, so partitions on
415+
the publisher must also exist on the subscriber as valid target tables.
416+
(They could either be leaf partitions themselves, or they could be
417+
further subpartitioned, or they could even be independent tables.)
415418
</para>
416419
</listitem>
417420
</itemizedlist>

‎src/backend/executor/execReplication.c

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -594,25 +594,17 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
594594
constchar*relname)
595595
{
596596
/*
597-
* We currently only support writing to regular tables. However, give a
598-
* more specific error for partitioned and foreign tables.
597+
* Give a more specific error for foreign tables.
599598
*/
600-
if (relkind==RELKIND_PARTITIONED_TABLE)
601-
ereport(ERROR,
602-
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
603-
errmsg("cannot use relation \"%s.%s\" as logical replication target",
604-
nspname,relname),
605-
errdetail("\"%s.%s\" is a partitioned table.",
606-
nspname,relname)));
607-
elseif (relkind==RELKIND_FOREIGN_TABLE)
599+
if (relkind==RELKIND_FOREIGN_TABLE)
608600
ereport(ERROR,
609601
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
610602
errmsg("cannot use relation \"%s.%s\" as logical replication target",
611603
nspname,relname),
612604
errdetail("\"%s.%s\" is a foreign table.",
613605
nspname,relname)));
614606

615-
if (relkind!=RELKIND_RELATION)
607+
if (relkind!=RELKIND_RELATION&&relkind!=RELKIND_PARTITIONED_TABLE)
616608
ereport(ERROR,
617609
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
618610
errmsg("cannot use relation \"%s.%s\" as logical replication target",

‎src/backend/replication/logical/relation.c

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,24 @@ static MemoryContext LogicalRepRelMapContext = NULL;
3535
staticHTAB*LogicalRepRelMap=NULL;
3636
staticHTAB*LogicalRepTypMap=NULL;
3737

38+
/*
39+
* Partition map (LogicalRepPartMap)
40+
*
41+
* When a partitioned table is used as replication target, replicated
42+
* operations are actually performed on its leaf partitions, which requires
43+
* the partitions to also be mapped to the remote relation. Parent's entry
44+
* (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
45+
* individual partitions may have different attribute numbers, which means
46+
* attribute mappings to remote relation's attributes must be maintained
47+
* separately for each partition.
48+
*/
49+
staticMemoryContextLogicalRepPartMapContext=NULL;
50+
staticHTAB*LogicalRepPartMap=NULL;
51+
typedefstructLogicalRepPartMapEntry
52+
{
53+
Oidpartoid;/* LogicalRepPartMap's key */
54+
LogicalRepRelMapEntryrelmapentry;
55+
}LogicalRepPartMapEntry;
3856

3957
/*
4058
* Relcache invalidation callback for our relation map cache.
@@ -472,3 +490,174 @@ logicalrep_typmap_gettypname(Oid remoteid)
472490
Assert(OidIsValid(entry->remoteid));
473491
returnpsprintf("%s.%s",entry->nspname,entry->typname);
474492
}
493+
494+
/*
495+
* Partition cache: look up partition LogicalRepRelMapEntry's
496+
*
497+
* Unlike relation map cache, this is keyed by partition OID, not remote
498+
* relation OID, because we only have to use this cache in the case where
499+
* partitions are not directly mapped to any remote relation, such as when
500+
* replication is occurring with one of their ancestors as target.
501+
*/
502+
503+
/*
504+
* Relcache invalidation callback
505+
*/
506+
staticvoid
507+
logicalrep_partmap_invalidate_cb(Datumarg,Oidreloid)
508+
{
509+
LogicalRepRelMapEntry*entry;
510+
511+
/* Just to be sure. */
512+
if (LogicalRepPartMap==NULL)
513+
return;
514+
515+
if (reloid!=InvalidOid)
516+
{
517+
HASH_SEQ_STATUSstatus;
518+
519+
hash_seq_init(&status,LogicalRepPartMap);
520+
521+
/* TODO, use inverse lookup hashtable? */
522+
while ((entry= (LogicalRepRelMapEntry*)hash_seq_search(&status))!=NULL)
523+
{
524+
if (entry->localreloid==reloid)
525+
{
526+
entry->localreloid=InvalidOid;
527+
hash_seq_term(&status);
528+
break;
529+
}
530+
}
531+
}
532+
else
533+
{
534+
/* invalidate all cache entries */
535+
HASH_SEQ_STATUSstatus;
536+
537+
hash_seq_init(&status,LogicalRepPartMap);
538+
539+
while ((entry= (LogicalRepRelMapEntry*)hash_seq_search(&status))!=NULL)
540+
entry->localreloid=InvalidOid;
541+
}
542+
}
543+
544+
/*
545+
* Initialize the partition map cache.
546+
*/
547+
staticvoid
548+
logicalrep_partmap_init(void)
549+
{
550+
HASHCTLctl;
551+
552+
if (!LogicalRepPartMapContext)
553+
LogicalRepPartMapContext=
554+
AllocSetContextCreate(CacheMemoryContext,
555+
"LogicalRepPartMapContext",
556+
ALLOCSET_DEFAULT_SIZES);
557+
558+
/* Initialize the relation hash table. */
559+
MemSet(&ctl,0,sizeof(ctl));
560+
ctl.keysize=sizeof(Oid);/* partition OID */
561+
ctl.entrysize=sizeof(LogicalRepPartMapEntry);
562+
ctl.hcxt=LogicalRepPartMapContext;
563+
564+
LogicalRepPartMap=hash_create("logicalrep partition map cache",64,&ctl,
565+
HASH_ELEM |HASH_BLOBS |HASH_CONTEXT);
566+
567+
/* Watch for invalidation events. */
568+
CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
569+
(Datum)0);
570+
}
571+
572+
/*
573+
* logicalrep_partition_open
574+
*
575+
* Returned entry reuses most of the values of the root table's entry, save
576+
* the attribute map, which can be different for the partition.
577+
*
578+
* Note there's no logialrep_partition_close, because the caller closes the
579+
* the component relation.
580+
*/
581+
LogicalRepRelMapEntry*
582+
logicalrep_partition_open(LogicalRepRelMapEntry*root,
583+
Relationpartrel,AttrMap*map)
584+
{
585+
LogicalRepRelMapEntry*entry;
586+
LogicalRepPartMapEntry*part_entry;
587+
LogicalRepRelation*remoterel=&root->remoterel;
588+
OidpartOid=RelationGetRelid(partrel);
589+
AttrMap*attrmap=root->attrmap;
590+
boolfound;
591+
inti;
592+
MemoryContextoldctx;
593+
594+
if (LogicalRepPartMap==NULL)
595+
logicalrep_partmap_init();
596+
597+
/* Search for existing entry. */
598+
part_entry= (LogicalRepPartMapEntry*)hash_search(LogicalRepPartMap,
599+
(void*)&partOid,
600+
HASH_ENTER,&found);
601+
602+
if (found)
603+
return&part_entry->relmapentry;
604+
605+
memset(part_entry,0,sizeof(LogicalRepPartMapEntry));
606+
607+
/* Switch to longer-lived context. */
608+
oldctx=MemoryContextSwitchTo(LogicalRepPartMapContext);
609+
610+
part_entry->partoid=partOid;
611+
612+
/* Remote relation is used as-is from the root entry. */
613+
entry=&part_entry->relmapentry;
614+
entry->remoterel.remoteid=remoterel->remoteid;
615+
entry->remoterel.nspname=pstrdup(remoterel->nspname);
616+
entry->remoterel.relname=pstrdup(remoterel->relname);
617+
entry->remoterel.natts=remoterel->natts;
618+
entry->remoterel.attnames=palloc(remoterel->natts*sizeof(char*));
619+
entry->remoterel.atttyps=palloc(remoterel->natts*sizeof(Oid));
620+
for (i=0;i<remoterel->natts;i++)
621+
{
622+
entry->remoterel.attnames[i]=pstrdup(remoterel->attnames[i]);
623+
entry->remoterel.atttyps[i]=remoterel->atttyps[i];
624+
}
625+
entry->remoterel.replident=remoterel->replident;
626+
entry->remoterel.attkeys=bms_copy(remoterel->attkeys);
627+
628+
entry->localrel=partrel;
629+
entry->localreloid=partOid;
630+
631+
/*
632+
* If the partition's attributes don't match the root relation's, we'll
633+
* need to make a new attrmap which maps partition attribute numbers to
634+
* remoterel's, instead the original which maps root relation's attribute
635+
* numbers to remoterel's.
636+
*
637+
* Note that 'map' which comes from the tuple routing data structure
638+
* contains 1-based attribute numbers (of the parent relation). However,
639+
* the map in 'entry', a logical replication data structure, contains
640+
* 0-based attribute numbers (of the remote relation).
641+
*/
642+
if (map)
643+
{
644+
AttrNumberattno;
645+
646+
entry->attrmap=make_attrmap(map->maplen);
647+
for (attno=0;attno<entry->attrmap->maplen;attno++)
648+
{
649+
AttrNumberroot_attno=map->attnums[attno];
650+
651+
entry->attrmap->attnums[attno]=attrmap->attnums[root_attno-1];
652+
}
653+
}
654+
else
655+
entry->attrmap=attrmap;
656+
657+
entry->updatable=root->updatable;
658+
659+
/* state and statelsn are left set to 0. */
660+
MemoryContextSwitchTo(oldctx);
661+
662+
returnentry;
663+
}

‎src/backend/replication/logical/tablesync.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,6 @@ copy_table(Relation rel)
762762
/* Map the publisher relation to local one. */
763763
relmapentry=logicalrep_rel_open(lrel.remoteid,NoLock);
764764
Assert(rel==relmapentry->localrel);
765-
Assert(relmapentry->localrel->rd_rel->relkind==RELKIND_RELATION);
766765

767766
/* Start copy on the publisher. */
768767
initStringInfo(&cmd);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp