@@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
108108{
109109Oid relid ;/* relation oid */
110110
111+ bool replicate_valid ;/* overall validity flag for entry */
112+
111113bool schema_sent ;
112114List * streamed_txns ;/* streamed toplevel transactions with this
113115 * schema */
114116
115- bool replicate_valid ;
117+ /* are we publishing this rel? */
116118PublicationActions pubactions ;
117119
118120/*
@@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
903905}
904906
905907/*
906- * Publication cache invalidation callback.
908+ * Publication syscache invalidation callback.
909+ *
910+ * Called for invalidations on pg_publication.
907911 */
908912static void
909913publication_invalidation_cb (Datum arg ,int cacheid ,uint32 hashvalue )
@@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
11301134HASH_ENTER ,& found );
11311135Assert (entry != NULL );
11321136
1133- /*Not found means schema wasn't sent */
1137+ /*initialize entry, if it's new */
11341138if (!found )
11351139{
1136- /* immediately make a new entry valid enough to satisfy callbacks */
1140+ entry -> replicate_valid = false;
11371141entry -> schema_sent = false;
11381142entry -> streamed_txns = NIL ;
1139- entry -> replicate_valid = false;
11401143entry -> pubactions .pubinsert = entry -> pubactions .pubupdate =
11411144entry -> pubactions .pubdelete = entry -> pubactions .pubtruncate = false;
11421145entry -> publish_as_relid = InvalidOid ;
@@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
11661169{
11671170oldctx = MemoryContextSwitchTo (CacheMemoryContext );
11681171if (data -> publications )
1172+ {
11691173list_free_deep (data -> publications );
1170-
1174+ data -> publications = NIL ;
1175+ }
11711176data -> publications = LoadPublications (data -> publication_names );
11721177MemoryContextSwitchTo (oldctx );
11731178publications_valid = true;
11741179}
11751180
1181+ /*
1182+ * Reset schema_sent status as the relation definition may have
1183+ * changed. Also reset pubactions to empty in case rel was dropped
1184+ * from a publication. Also free any objects that depended on the
1185+ * earlier definition.
1186+ */
1187+ entry -> schema_sent = false;
1188+ list_free (entry -> streamed_txns );
1189+ entry -> streamed_txns = NIL ;
1190+ entry -> pubactions .pubinsert = false;
1191+ entry -> pubactions .pubupdate = false;
1192+ entry -> pubactions .pubdelete = false;
1193+ entry -> pubactions .pubtruncate = false;
1194+ if (entry -> map )
1195+ {
1196+ /*
1197+ * Must free the TupleDescs contained in the map explicitly,
1198+ * because free_conversion_map() doesn't.
1199+ */
1200+ FreeTupleDesc (entry -> map -> indesc );
1201+ FreeTupleDesc (entry -> map -> outdesc );
1202+ free_conversion_map (entry -> map );
1203+ }
1204+ entry -> map = NULL ;
1205+
11761206/*
11771207 * Build publication cache. We can't use one provided by relcache as
11781208 * relcache considers all publications given relation is in, but here
@@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
12121242foreach (lc2 ,ancestors )
12131243{
12141244Oid ancestor = lfirst_oid (lc2 );
1245+ List * apubids = GetRelationPublications (ancestor );
1246+ List * aschemaPubids = GetSchemaPublications (get_rel_namespace (ancestor ));
12151247
1216- if (list_member_oid (GetRelationPublications (ancestor ),
1217- pub -> oid )||
1218- list_member_oid (GetSchemaPublications (get_rel_namespace (ancestor )),
1219- pub -> oid ))
1248+ if (list_member_oid (apubids ,pub -> oid )||
1249+ list_member_oid (aschemaPubids ,pub -> oid ))
12201250{
12211251ancestor_published = true;
12221252if (pub -> pubviaroot )
12231253publish_as_relid = ancestor ;
12241254}
1255+ list_free (apubids );
1256+ list_free (aschemaPubids );
12251257}
12261258}
12271259
@@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
12511283}
12521284
12531285list_free (pubids );
1286+ list_free (schemaPubids );
12541287
12551288entry -> publish_as_relid = publish_as_relid ;
12561289entry -> replicate_valid = true;
@@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
13221355/*
13231356 * Nobody keeps pointers to entries in this hash table around outside
13241357 * logical decoding callback calls - but invalidation events can come in
1325- * *during* a callback if we access the relcache in the callback. Because
1326- * of that we must mark the cache entry as invalid but not remove it from
1327- * the hash while it could still be referenced, then prune it at a later
1328- * safe point.
1329- *
1330- * Getting invalidations for relations that aren't in the table is
1331- * entirely normal, since there's no way to unregister for an invalidation
1332- * event. So we don't care if it's found or not.
1358+ * *during* a callback if we do any syscache access in the callback.
1359+ * Because of that we must mark the cache entry as invalid but not damage
1360+ * any of its substructure here. The next get_rel_sync_entry() call will
1361+ * rebuild it all.
13331362 */
1334- entry = (RelationSyncEntry * )hash_search (RelationSyncCache ,& relid ,
1335- HASH_FIND ,NULL );
1336-
1337- /*
1338- * Reset schema sent status as the relation definition may have changed.
1339- * Also free any objects that depended on the earlier definition.
1340- */
1341- if (entry != NULL )
1363+ if (OidIsValid (relid ))
13421364{
1343- entry -> schema_sent = false;
1344- list_free (entry -> streamed_txns );
1345- entry -> streamed_txns = NIL ;
1346- if (entry -> map )
1365+ /*
1366+ * Getting invalidations for relations that aren't in the table is
1367+ * entirely normal. So we don't care if it's found or not.
1368+ */
1369+ entry = (RelationSyncEntry * )hash_search (RelationSyncCache ,& relid ,
1370+ HASH_FIND ,NULL );
1371+ if (entry != NULL )
1372+ entry -> replicate_valid = false;
1373+ }
1374+ else
1375+ {
1376+ /* Whole cache must be flushed. */
1377+ HASH_SEQ_STATUS status ;
1378+
1379+ hash_seq_init (& status ,RelationSyncCache );
1380+ while ((entry = (RelationSyncEntry * )hash_seq_search (& status ))!= NULL )
13471381{
1348- /*
1349- * Must free the TupleDescs contained in the map explicitly,
1350- * because free_conversion_map() doesn't.
1351- */
1352- FreeTupleDesc (entry -> map -> indesc );
1353- FreeTupleDesc (entry -> map -> outdesc );
1354- free_conversion_map (entry -> map );
1382+ entry -> replicate_valid = false;
13551383}
1356- entry -> map = NULL ;
13571384}
13581385}
13591386
13601387/*
13611388 * Publication relation/schema map syscache invalidation callback
1389+ *
1390+ * Called for invalidations on pg_publication, pg_publication_rel, and
1391+ * pg_publication_namespace.
13621392 */
13631393static void
13641394rel_sync_cache_publication_cb (Datum arg ,int cacheid ,uint32 hashvalue )
@@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
13821412while ((entry = (RelationSyncEntry * )hash_seq_search (& status ))!= NULL )
13831413{
13841414entry -> replicate_valid = false;
1385-
1386- /*
1387- * There might be some relations dropped from the publication so we
1388- * don't need to publish the changes for them.
1389- */
1390- entry -> pubactions .pubinsert = false;
1391- entry -> pubactions .pubupdate = false;
1392- entry -> pubactions .pubdelete = false;
1393- entry -> pubactions .pubtruncate = false;
13941415}
13951416}
13961417