index 61bed1d481b0bf465521f53c058d52ce4952f297..5d7e1b184dcbd8141aa0245e9685bcc77d348654 100644 (file)
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
+#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/array.h"
PG_RETURN_BOOL(result);
}
+/*
+ * Gets the relations based on the publication partition option for a specified
+ * relation.
+ */
+List *
+GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
+ Oid relid)
+{
+ if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
+ pub_partopt != PUBLICATION_PART_ROOT)
+ {
+ List *all_parts = find_all_inheritors(relid, NoLock,
+ NULL);
+
+ if (pub_partopt == PUBLICATION_PART_ALL)
+ result = list_concat(result, all_parts);
+ else if (pub_partopt == PUBLICATION_PART_LEAF)
+ {
+ ListCell *lc;
+
+ foreach(lc, all_parts)
+ {
+ Oid partOid = lfirst_oid(lc);
+
+ if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
+ result = lappend_oid(result, partOid);
+ }
+ }
+ else
+ Assert(false);
+ }
+ else
+ result = lappend_oid(result, relid);
+
+ return result;
+}
/*
* Insert new publication / relation mapping.
@@-153,6+190,7 @@ publication_add_relation(Oid pubid, Relation targetrel, Publication *pub = GetPublication(pubid);
ObjectAddress myself,
referenced;
+ List *relids = NIL;
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@-208,8+246,18 @@ publication_add_relation(Oid pubid, Relation targetrel, /* Close the table. */
table_close(rel, RowExclusiveLock);
- /* Invalidate relcache so that publication info is rebuilt. */
- CacheInvalidateRelcache(targetrel);
+ /*
+ * Invalidate relcache so that publication info is rebuilt.
+ *
+ * For the partitioned tables, we must invalidate all partitions contained
+ * in the respective partition hierarchies, not just the one explicitly
+ * mentioned in the publication. This is required because we implicitly
+ * publish the child tables when the parent table is published.
+ */
+ relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
+ relid);
+
+ InvalidatePublicationRels(relids);
return myself;
}
/*
* Gets list of relation oids for a publication.
*
- * This should only be used for normal publications, the FOR ALL TABLES
+ * This should only be used FOR TABLE publications, the FOR ALL TABLES
* should use GetAllTablesPublicationRelations().
*/
List *
@@-270,32+318,8 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) Form_pg_publication_rel pubrel;
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
-
- if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
- pub_partopt != PUBLICATION_PART_ROOT)
- {
- List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
- NULL);
-
- if (pub_partopt == PUBLICATION_PART_ALL)
- result = list_concat(result, all_parts);
- else if (pub_partopt == PUBLICATION_PART_LEAF)
- {
- ListCell *lc;
-
- foreach(lc, all_parts)
- {
- Oid partOid = lfirst_oid(lc);
-
- if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
- result = lappend_oid(result, partOid);
- }
- }
- else
- Assert(false);
- }
- else
- result = lappend_oid(result, pubrel->prrelid);
+ result = GetPubPartitionOptionRelations(result, pub_partopt,
+ pubrel->prrelid);
}
systable_endscan(scan);
index 879710143e155c0da36ec488967b6db29a846234..7ee88255224f73275428d676761f88ab1698ba6f 100644 (file)
#include "utils/syscache.h"
#include "utils/varlena.h"
-/* Same as MAXNUMMESSAGES in sinvaladt.c */
-#define MAX_RELCACHE_INVAL_MSGS 4096
-
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
@@-330,23+327,7 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, List *relids = GetPublicationRelations(pubform->oid,
PUBLICATION_PART_ALL);
- /*
- * We don't want to send too many individual messages, at some point
- * it's cheaper to just reset whole relcache.
- */
- if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
- {
- ListCell *lc;
-
- foreach(lc, relids)
- {
- Oid relid = lfirst_oid(lc);
-
- CacheInvalidateRelcacheByRelid(relid);
- }
- }
- else
- CacheInvalidateRelcacheAll();
+ InvalidatePublicationRels(relids);
}
ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
@@-356,6+337,27 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
}
+/*
+ * Invalidate the relations.
+ */
+void
+InvalidatePublicationRels(List *relids)
+{
+ /*
+ * We don't want to send too many individual messages, at some point it's
+ * cheaper to just reset whole relcache.
+ */
+ if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+ {
+ ListCell *lc;
+
+ foreach(lc, relids)
+ CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
+ }
+ else
+ CacheInvalidateRelcacheAll();
+}
+
/*
* Add or remove table to/from publication.
*/
Relation rel;
HeapTuple tup;
Form_pg_publication_rel pubrel;
+ List *relids = NIL;
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
- /* Invalidate relcache so that publication info is rebuilt. */
- CacheInvalidateRelcacheByRelid(pubrel->prrelid);
+ /*
+ * Invalidate relcache so that publication info is rebuilt.
+ *
+ * For the partitioned tables, we must invalidate all partitions contained
+ * in the respective partition hierarchies, not just the one explicitly
+ * mentioned in the publication. This is required because we implicitly
+ * publish the child tables when the parent table is published.
+ */
+ relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
+ pubrel->prrelid);
+
+ InvalidatePublicationRels(relids);
CatalogTupleDelete(rel, &tup->t_self);
index 5955ba0cf21f5ea0e3b96cb6e72acd16b81a03b1..c876085a4956ade04d718cf7796fbd30d37c4e64 100644 (file)
@@-107,6+107,9 @@ extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
bool if_not_exists);
+extern List *GetPubPartitionOptionRelations(List *result,
+ PublicationPartOpt pub_partopt,
+ Oid relid);
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);
index e713df0c31bf21104836509ab6c8ec548e5a6838..5573bc52efd8918573997046b45e17709c9bbf85 100644 (file)
#include "catalog/objectaddress.h"
#include "nodes/parsenodes.h"
+#include "utils/inval.h"
+
+/* Same as MAXNUMMESSAGES in sinvaladt.c */
+#define MAX_RELCACHE_INVAL_MSGS 4096
extern ObjectAddress CreatePublication(CreatePublicationStmt *stmt);
extern void AlterPublication(AlterPublicationStmt *stmt);
@@-25,5+29,6 @@ extern void RemovePublicationRelById(Oid proid);
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
+extern void InvalidatePublicationRels(List *relids);
#endif /* PUBLICATIONCMDS_H */
index c299702480270e6a6a0e0586e9c49a78e2003378..f27859373d153312ef58def7c21d097ab7d67885 100644 (file)
CREATE PUBLICATION testpub_forparted1;
RESET client_min_messages;
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+CREATE TABLE testpub_parted2 (LIKE testpub_parted);
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
-- works despite missing REPLICA IDENTITY, because updates are not replicated
UPDATE testpub_parted1 SET a = 1;
-ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
-- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted
@@-154,7+156,14 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); Tables:
"public.testpub_parted"
-DROP TABLE testpub_parted1;
+-- still fail, because parent's publication replicates updates
+UPDATE testpub_parted2 SET a = 2;
+ERROR: cannot update table "testpub_parted2" because it does not have a replica identity and publishes updates
+HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+-- works again, because update is no longer replicated
+UPDATE testpub_parted2 SET a = 2;
+DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
index 04b34ee2998c5a65b50c9a56151cc13238bd998f..e5745d575b018e73c7974c5e5fbfa0e338c2588d 100644 (file)
CREATE PUBLICATION testpub_forparted1;
RESET client_min_messages;
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+CREATE TABLE testpub_parted2 (LIKE testpub_parted);
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
-- works despite missing REPLICA IDENTITY, because updates are not replicated
UPDATE testpub_parted1 SET a = 1;
-ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
-- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted
@@-90,7+92,12 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; UPDATE testpub_parted1 SET a = 1;
ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
\dRp+ testpub_forparted
-DROP TABLE testpub_parted1;
+-- still fail, because parent's publication replicates updates
+UPDATE testpub_parted2 SET a = 2;
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+-- works again, because update is no longer replicated
+UPDATE testpub_parted2 SET a = 2;
+DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication