Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3abe9dc

Browse files
author
Amit Kapila
committed
Avoid invalidating all RelationSyncCache entries on publication rename.
On Publication rename, we need to only invalidate the RelationSyncCacheentries corresponding to relations that are part of the publication beingrenamed.As part of this patch, we introduce a new invalidation message toinvalidate the cache maintained by the logical decoding output plugin. Wecan't use existing relcache invalidation for this purpose, as that wouldunnecessarily cause relcache invalidations in other backends.This will improve performance by building fewer relation cache entriesduring logical replication.Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>Author: Shlok Kyal <shlok.kyal.oss@gmail.com>Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Discussion:https://postgr.es/m/OSCPR01MB14966C09AA201EFFA706576A7F5C92@OSCPR01MB14966.jpnprd01.prod.outlook.com
1 parent75da2be commit3abe9dc

File tree

10 files changed

+302
-16
lines changed

10 files changed

+302
-16
lines changed

‎src/backend/access/rmgrdesc/standbydesc.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
132132
appendStringInfo(buf," relmap db %u",msg->rm.dbId);
133133
elseif (msg->id==SHAREDINVALSNAPSHOT_ID)
134134
appendStringInfo(buf," snapshot %u",msg->sn.relId);
135+
elseif (msg->id==SHAREDINVALRELSYNC_ID)
136+
appendStringInfo(buf," relsync %u",msg->rs.relid);
135137
else
136138
appendStringInfo(buf," unrecognized id %d",msg->id);
137139
}

‎src/backend/commands/alter.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
338338

339339
InvokeObjectPostAlterHook(classId,objectId,0);
340340

341+
/* Do post catalog-update tasks */
342+
if (classId==PublicationRelationId)
343+
{
344+
Form_pg_publicationpub= (Form_pg_publication)GETSTRUCT(oldtup);
345+
346+
/*
347+
* Invalidate relsynccache entries.
348+
*
349+
* Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
350+
* publication does not impact the publication status of tables. So,
351+
* we don't need to invalidate relcache to rebuild the rd_pubdesc.
352+
* Instead, we invalidate only the relsyncache.
353+
*/
354+
InvalidatePubRelSyncCache(pub->oid,pub->puballtables);
355+
}
356+
341357
/* Release memory */
342358
pfree(values);
343359
pfree(nulls);

‎src/backend/commands/publicationcmds.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
491491
return*invalid_column_list||*invalid_gen_col;
492492
}
493493

494+
/*
495+
* Invalidate entries in the RelationSyncCache for relations included in the
496+
* specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
497+
*
498+
* If 'puballtables' is true, invalidate all cache entries.
499+
*/
500+
void
501+
InvalidatePubRelSyncCache(Oidpubid,boolpuballtables)
502+
{
503+
if (puballtables)
504+
{
505+
CacheInvalidateRelSyncAll();
506+
}
507+
else
508+
{
509+
List*relids=NIL;
510+
List*schemarelids=NIL;
511+
512+
/*
513+
* For partitioned tables, we must invalidate all partitions and
514+
* itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
515+
* a target. However, WAL records for TRUNCATE specify both a root and
516+
* its leaves.
517+
*/
518+
relids=GetPublicationRelations(pubid,
519+
PUBLICATION_PART_ALL);
520+
schemarelids=GetAllSchemaPublicationRelations(pubid,
521+
PUBLICATION_PART_ALL);
522+
523+
relids=list_concat_unique_oid(relids,schemarelids);
524+
525+
/* Invalidate the relsyncache */
526+
foreach_oid(relid,relids)
527+
CacheInvalidateRelSync(relid);
528+
}
529+
530+
return;
531+
}
532+
494533
/* check_functions_in_node callback */
495534
staticbool
496535
contain_mutable_or_user_functions_checker(Oidfunc_id,void*context)

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
531531
CacheRegisterSyscacheCallback(PUBLICATIONOID,
532532
publication_invalidation_cb,
533533
(Datum)0);
534+
CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
535+
(Datum)0);
534536
publication_callback_registered= true;
535537
}
536538

@@ -1789,12 +1791,6 @@ static void
17891791
publication_invalidation_cb(Datumarg,intcacheid,uint32hashvalue)
17901792
{
17911793
publications_valid= false;
1792-
1793-
/*
1794-
* Also invalidate per-relation cache so that next time the filtering info
1795-
* is checked it will be updated with the new publication settings.
1796-
*/
1797-
rel_sync_cache_publication_cb(arg,cacheid,hashvalue);
17981794
}
17991795

18001796
/*

‎src/backend/utils/cache/inval.c

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ intdebug_discard_caches = 0;
271271

272272
#defineMAX_SYSCACHE_CALLBACKS 64
273273
#defineMAX_RELCACHE_CALLBACKS 10
274+
#defineMAX_RELSYNC_CALLBACKS 10
274275

275276
staticstructSYSCACHECALLBACK
276277
{
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
292293

293294
staticintrelcache_callback_count=0;
294295

296+
staticstructRELSYNCCALLBACK
297+
{
298+
RelSyncCallbackFunctionfunction;
299+
Datumarg;
300+
}relsync_callback_list[MAX_RELSYNC_CALLBACKS];
301+
302+
staticintrelsync_callback_count=0;
303+
304+
295305
/* ----------------------------------------------------------------
296306
*Invalidation subgroup support functions
297307
* ----------------------------------------------------------------
@@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
484494
AddInvalidationMessage(group,RelCacheMsgs,&msg);
485495
}
486496

497+
/*
498+
* Add a relsync inval entry
499+
*
500+
* We put these into the relcache subgroup for simplicity. This message is the
501+
* same as AddRelcacheInvalidationMessage() except that it is for
502+
* RelationSyncCache maintained by decoding plugin pgoutput.
503+
*/
504+
staticvoid
505+
AddRelsyncInvalidationMessage(InvalidationMsgsGroup*group,
506+
OiddbId,OidrelId)
507+
{
508+
SharedInvalidationMessagemsg;
509+
510+
/* Don't add a duplicate item. */
511+
ProcessMessageSubGroup(group,RelCacheMsgs,
512+
if (msg->rc.id==SHAREDINVALRELSYNC_ID&&
513+
(msg->rc.relId==relId||
514+
msg->rc.relId==InvalidOid))
515+
return);
516+
517+
/* OK, add the item */
518+
msg.rc.id=SHAREDINVALRELSYNC_ID;
519+
msg.rc.dbId=dbId;
520+
msg.rc.relId=relId;
521+
/* check AddCatcacheInvalidationMessage() for an explanation */
522+
VALGRIND_MAKE_MEM_DEFINED(&msg,sizeof(msg));
523+
524+
AddInvalidationMessage(group,RelCacheMsgs,&msg);
525+
}
526+
487527
/*
488528
* Add a snapshot inval entry
489529
*
@@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
611651
info->RelcacheInitFileInval= true;
612652
}
613653

654+
/*
655+
* RegisterRelsyncInvalidation
656+
*
657+
* As above, but register a relsynccache invalidation event.
658+
*/
659+
staticvoid
660+
RegisterRelsyncInvalidation(InvalidationInfo*info,OiddbId,OidrelId)
661+
{
662+
AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs,dbId,relId);
663+
}
664+
614665
/*
615666
* RegisterSnapshotInvalidation
616667
*
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
751802

752803
ccitem->function(ccitem->arg,InvalidOid);
753804
}
805+
806+
for (i=0;i<relsync_callback_count;i++)
807+
{
808+
structRELSYNCCALLBACK*ccitem=relsync_callback_list+i;
809+
810+
ccitem->function(ccitem->arg,InvalidOid);
811+
}
754812
}
755813

756814
/*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
832890
elseif (msg->sn.dbId==MyDatabaseId)
833891
InvalidateCatalogSnapshot();
834892
}
893+
elseif (msg->id==SHAREDINVALRELSYNC_ID)
894+
{
895+
/* We only care about our own database */
896+
if (msg->rs.dbId==MyDatabaseId)
897+
CallRelSyncCallbacks(msg->rs.relid);
898+
}
835899
else
836900
elog(FATAL,"unrecognized SI message ID: %d",msg->id);
837901
}
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
16211685
ReleaseSysCache(tup);
16221686
}
16231687

1688+
/*
1689+
* CacheInvalidateRelSync
1690+
*Register invalidation of the cache in logical decoding output plugin
1691+
*for a database.
1692+
*
1693+
* This type of invalidation message is used for the specific purpose of output
1694+
* plugins. Processes which do not decode WALs would do nothing even when it
1695+
* receives the message.
1696+
*/
1697+
void
1698+
CacheInvalidateRelSync(Oidrelid)
1699+
{
1700+
RegisterRelsyncInvalidation(PrepareInvalidationState(),
1701+
MyDatabaseId,relid);
1702+
}
1703+
1704+
/*
1705+
* CacheInvalidateRelSyncAll
1706+
*Register invalidation of the whole cache in logical decoding output
1707+
*plugin.
1708+
*/
1709+
void
1710+
CacheInvalidateRelSyncAll(void)
1711+
{
1712+
CacheInvalidateRelSync(InvalidOid);
1713+
}
16241714

16251715
/*
16261716
* CacheInvalidateSmgr
@@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
17631853
++relcache_callback_count;
17641854
}
17651855

1856+
/*
1857+
* CacheRegisterRelSyncCallback
1858+
*Register the specified function to be called for all future
1859+
*relsynccache invalidation events.
1860+
*
1861+
* This function is intended to be call from the logical decoding output
1862+
* plugins.
1863+
*/
1864+
void
1865+
CacheRegisterRelSyncCallback(RelSyncCallbackFunctionfunc,
1866+
Datumarg)
1867+
{
1868+
if (relsync_callback_count >=MAX_RELSYNC_CALLBACKS)
1869+
elog(FATAL,"out of relsync_callback_list slots");
1870+
1871+
relsync_callback_list[relsync_callback_count].function=func;
1872+
relsync_callback_list[relsync_callback_count].arg=arg;
1873+
1874+
++relsync_callback_count;
1875+
}
1876+
17661877
/*
17671878
* CallSyscacheCallbacks
17681879
*
@@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
17881899
}
17891900
}
17901901

1902+
/*
1903+
* CallSyscacheCallbacks
1904+
*/
1905+
void
1906+
CallRelSyncCallbacks(Oidrelid)
1907+
{
1908+
for (inti=0;i<relsync_callback_count;i++)
1909+
{
1910+
structRELSYNCCALLBACK*ccitem=relsync_callback_list+i;
1911+
1912+
ccitem->function(ccitem->arg,relid);
1913+
}
1914+
}
1915+
17911916
/*
17921917
* LogLogicalInvalidations
17931918
*

‎src/include/commands/publicationcmds.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
3838
charpubgencols_type,
3939
bool*invalid_column_list,
4040
bool*invalid_gen_col);
41+
externvoidInvalidatePubRelSyncCache(Oidpubid,boolpuballtables);
4142

4243
#endif/* PUBLICATIONCMDS_H */

‎src/include/pg_config_manual.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,10 @@
282282

283283
/*
284284
* For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
285-
* use of the debug_discard_caches GUC to aggressively flush syscache/relcache
286-
* entries whenever it's possible to deliver invalidations. See
287-
* AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
288-
* details.
285+
* use of the debug_discard_caches GUC to aggressively flush
286+
*syscache/relcache/relsynccacheentries whenever it's possible to deliver
287+
*invalidations. SeeAcceptInvalidationMessages() in
288+
*src/backend/utils/cache/inval.c fordetails.
289289
*
290290
* USE_ASSERT_CHECKING builds default to enabling this. It's possible to use
291291
* DISCARD_CACHES_ENABLED without a cassert build and the implied

‎src/include/storage/sinval.h

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
** invalidate an smgr cache entry for a specific physical relation
2828
** invalidate the mapped-relation mapping for a given database
2929
** invalidate any saved snapshot that might be used to scan a given relation
30+
** invalidate a RelationSyncCache entry for a specific relation
3031
* More types could be added if needed. The message type is identified by
3132
* the first "int8" field of the message struct. Zero or positive means a
3233
* specific-catcache inval message (and also serves as the catcache ID field).
@@ -46,12 +47,12 @@
4647
* catcache inval messages must be generated for each of its caches, since
4748
* the hash keys will generally be different.
4849
*
49-
* Catcache, relcache, and snapshot invalidations are transactional, and so
50-
* are sent to other backends upon commit. Internally to the generating
51-
* backend, they are also processed at CommandCounterIncrement so that later
52-
* commands in the same transaction see the new state. The generating backend
53-
* also has to process them at abort, to flush out any cache state it's loaded
54-
* from no-longer-valid entries.
50+
* Catcache, relcache,relsynccache,and snapshot invalidations are
51+
*transactional, and soare sent to other backends upon commit. Internally
52+
*to the generatingbackend, they are also processed at
53+
*CommandCounterIncrement so that latercommands in the same transaction see
54+
*the new state. The generating backendalso has to process them at abort,
55+
*to flush out any cache state it's loadedfrom no-longer-valid entries.
5556
*
5657
* smgr and relation mapping invalidations are non-transactional: they are
5758
* sent immediately when the underlying file change is made.
@@ -110,6 +111,16 @@ typedef struct
110111
OidrelId;/* relation ID */
111112
}SharedInvalSnapshotMsg;
112113

114+
#defineSHAREDINVALRELSYNC_ID(-6)
115+
116+
typedefstruct
117+
{
118+
int8id;/* type field --- must be first */
119+
OiddbId;/* database ID */
120+
Oidrelid;/* relation ID, or 0 if whole
121+
* RelationSyncCache */
122+
}SharedInvalRelSyncMsg;
123+
113124
typedefunion
114125
{
115126
int8id;/* type field --- must be first */
@@ -119,6 +130,7 @@ typedef union
119130
SharedInvalSmgrMsgsm;
120131
SharedInvalRelmapMsgrm;
121132
SharedInvalSnapshotMsgsn;
133+
SharedInvalRelSyncMsgrs;
122134
}SharedInvalidationMessage;
123135

124136

‎src/include/utils/inval.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
2222

2323
typedefvoid (*SyscacheCallbackFunction) (Datumarg,intcacheid,uint32hashvalue);
2424
typedefvoid (*RelcacheCallbackFunction) (Datumarg,Oidrelid);
25+
typedefvoid (*RelSyncCallbackFunction) (Datumarg,Oidrelid);
2526

2627

2728
externvoidAcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
5556

5657
externvoidCacheInvalidateRelcacheByRelid(Oidrelid);
5758

59+
externvoidCacheInvalidateRelSync(Oidrelid);
60+
61+
externvoidCacheInvalidateRelSyncAll(void);
62+
5863
externvoidCacheInvalidateSmgr(RelFileLocatorBackendrlocator);
5964

6065
externvoidCacheInvalidateRelmap(OiddatabaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
6671
externvoidCacheRegisterRelcacheCallback(RelcacheCallbackFunctionfunc,
6772
Datumarg);
6873

74+
externvoidCacheRegisterRelSyncCallback(RelSyncCallbackFunctionfunc,
75+
Datumarg);
76+
6977
externvoidCallSyscacheCallbacks(intcacheid,uint32hashvalue);
7078

79+
externvoidCallRelSyncCallbacks(Oidrelid);
80+
7181
externvoidInvalidateSystemCaches(void);
7282
externvoidInvalidateSystemCachesExtended(booldebug_discard);
7383

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp