Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit062a844

Browse files
author
Amit Kapila
committed
Avoid syncing data twice for the 'publish_via_partition_root' option.
When there are multiple publications for a subscription and one of thosepublishes via the parent table by using publish_via_partition_root and theother one directly publishes the child table, we end up copying the samedata twice during initial synchronization. The reason for this was that weget both the parent and child tables from the publisher and try to copythe data for both of them.This patch extends the function pg_get_publication_tables() to take apublication list as its input parameter. This allows us to exclude apartition table whose ancestor is published by the same publication list.This problem does exist in back-branches but we decide to fix it there ina separate commit if required. The fix for back-branches requires quitecomplicated changes to fetch the required table information from thepublisher as we can't update the function pg_get_publication_tables() inback-branches. We are not sure whether we want to deviate and complicatethe code in back-branches for this problem as there are no field reportsyet.Author: Wang weiReviewed-by: Peter Smith, Jacob Champion, Kuroda Hayato, Vignesh C, Osumi Takamichi, Amit KapilaDiscussion:https://postgr.es/m/OS0PR01MB57167F45D481F78CDC5986F794B99@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parentde5a47a commit062a844

File tree

10 files changed

+274
-99
lines changed

10 files changed

+274
-99
lines changed

‎doc/src/sgml/ref/create_publication.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,16 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
201201
consisting of a different set of partitions.
202202
</para>
203203

204+
<para>
205+
There can be a case where a subscription combines multiple
206+
publications. If a partitioned table is published by any
207+
subscribed publications which set
208+
<literal>publish_via_partition_root</literal> = true, changes on this
209+
partitioned table (or on its partitions) will be published using
210+
the identity and schema of this partitioned table rather than
211+
that of the individual partitions.
212+
</para>
213+
204214
<para>
205215
This parameter also affects how row filters and column lists are
206216
chosen for partitions; see below for details.

‎src/backend/catalog/pg_publication.c

Lines changed: 140 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@
4545
#include"utils/rel.h"
4646
#include"utils/syscache.h"
4747

48+
/* Records association between publication and published table */
49+
typedefstruct
50+
{
51+
Oidrelid;/* OID of published table */
52+
Oidpubid;/* OID of publication that publishes this
53+
* table. */
54+
}published_rel;
55+
4856
staticvoidpublication_translate_columns(Relationtargetrel,List*columns,
4957
int*natts,AttrNumber**attrs);
5058

@@ -172,42 +180,57 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
172180
}
173181

174182
/*
175-
*Filter outthepartitions whose parent tables were also specified in
176-
*the publication.
183+
*Returns true iftheancestor is in the list of published relations.
184+
*Otherwise, returns false.
177185
*/
178-
staticList*
179-
filter_partitions(List*relids)
186+
staticbool
187+
is_ancestor_member_tableinfos(Oidancestor,List*table_infos)
188+
{
189+
ListCell*lc;
190+
191+
foreach(lc,table_infos)
192+
{
193+
Oidrelid= ((published_rel*)lfirst(lc))->relid;
194+
195+
if (relid==ancestor)
196+
return true;
197+
}
198+
199+
return false;
200+
}
201+
202+
/*
203+
* Filter out the partitions whose parent tables are also present in the list.
204+
*/
205+
staticvoid
206+
filter_partitions(List*table_infos)
180207
{
181-
List*result=NIL;
182208
ListCell*lc;
183-
ListCell*lc2;
184209

185-
foreach(lc,relids)
210+
foreach(lc,table_infos)
186211
{
187212
boolskip= false;
188213
List*ancestors=NIL;
189-
Oidrelid=lfirst_oid(lc);
214+
ListCell*lc2;
215+
published_rel*table_info= (published_rel*)lfirst(lc);
190216

191-
if (get_rel_relispartition(relid))
192-
ancestors=get_partition_ancestors(relid);
217+
if (get_rel_relispartition(table_info->relid))
218+
ancestors=get_partition_ancestors(table_info->relid);
193219

194220
foreach(lc2,ancestors)
195221
{
196222
Oidancestor=lfirst_oid(lc2);
197223

198-
/* Check if the parent table exists in the published table list. */
199-
if (list_member_oid(relids,ancestor))
224+
if (is_ancestor_member_tableinfos(ancestor,table_infos))
200225
{
201226
skip= true;
202227
break;
203228
}
204229
}
205230

206-
if (!skip)
207-
result=lappend_oid(result,relid);
231+
if (skip)
232+
table_infos=foreach_delete_current(table_infos,lc);
208233
}
209-
210-
returnresult;
211234
}
212235

213236
/*
@@ -1026,91 +1049,136 @@ GetPublicationByName(const char *pubname, bool missing_ok)
10261049
}
10271050

10281051
/*
1029-
* Returns information of tables in a publication.
1052+
* Get information of the tables in the given publication array.
1053+
*
1054+
* Returns pubid, relid, column list, row filter for each table.
10301055
*/
10311056
Datum
10321057
pg_get_publication_tables(PG_FUNCTION_ARGS)
10331058
{
1034-
#defineNUM_PUBLICATION_TABLES_ELEM3
1059+
#defineNUM_PUBLICATION_TABLES_ELEM4
10351060
FuncCallContext*funcctx;
1036-
char*pubname=text_to_cstring(PG_GETARG_TEXT_PP(0));
1037-
Publication*publication;
1038-
List*tables;
1061+
List*table_infos=NIL;
10391062

10401063
/* stuff done only on the first call of the function */
10411064
if (SRF_IS_FIRSTCALL())
10421065
{
10431066
TupleDesctupdesc;
10441067
MemoryContextoldcontext;
1068+
ArrayType*arr;
1069+
Datum*elems;
1070+
intnelems,
1071+
i;
1072+
boolviaroot= false;
10451073

10461074
/* create a function context for cross-call persistence */
10471075
funcctx=SRF_FIRSTCALL_INIT();
10481076

10491077
/* switch to memory context appropriate for multiple function calls */
10501078
oldcontext=MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
10511079

1052-
publication=GetPublicationByName(pubname, false);
1053-
10541080
/*
1055-
* Publications support partitioned tables, although all changes are
1056-
* replicated using leaf partition identity and schema, so we only
1057-
* need those.
1081+
* Deconstruct the parameter into elements where each element is a
1082+
* publication name.
10581083
*/
1059-
if (publication->alltables)
1060-
{
1061-
tables=GetAllTablesPublicationRelations(publication->pubviaroot);
1062-
}
1063-
else
1084+
arr=PG_GETARG_ARRAYTYPE_P(0);
1085+
deconstruct_array(arr,TEXTOID,-1, false,TYPALIGN_INT,
1086+
&elems,NULL,&nelems);
1087+
1088+
/* Get Oids of tables from each publication. */
1089+
for (i=0;i<nelems;i++)
10641090
{
1065-
List*relids,
1066-
*schemarelids;
1067-
1068-
relids=GetPublicationRelations(publication->oid,
1069-
publication->pubviaroot ?
1070-
PUBLICATION_PART_ROOT :
1071-
PUBLICATION_PART_LEAF);
1072-
schemarelids=GetAllSchemaPublicationRelations(publication->oid,
1073-
publication->pubviaroot ?
1074-
PUBLICATION_PART_ROOT :
1075-
PUBLICATION_PART_LEAF);
1076-
tables=list_concat_unique_oid(relids,schemarelids);
1091+
Publication*pub_elem;
1092+
List*pub_elem_tables=NIL;
1093+
ListCell*lc;
1094+
1095+
pub_elem=GetPublicationByName(TextDatumGetCString(elems[i]), false);
10771096

10781097
/*
1079-
* If the publication publishes partition changes via their
1080-
* respective root partitioned tables, we must exclude partitions
1081-
* in favor of including the root partitioned tables. Otherwise,
1082-
* the function could return both the child and parent tables
1083-
* which could cause data of the child table to be
1084-
* double-published on the subscriber side.
1098+
* Publications support partitioned tables. If
1099+
* publish_via_partition_root is false, all changes are replicated
1100+
* using leaf partition identity and schema, so we only need
1101+
* those. Otherwise, get the partitioned table itself.
10851102
*/
1086-
if (publication->pubviaroot)
1087-
tables=filter_partitions(tables);
1103+
if (pub_elem->alltables)
1104+
pub_elem_tables=GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1105+
else
1106+
{
1107+
List*relids,
1108+
*schemarelids;
1109+
1110+
relids=GetPublicationRelations(pub_elem->oid,
1111+
pub_elem->pubviaroot ?
1112+
PUBLICATION_PART_ROOT :
1113+
PUBLICATION_PART_LEAF);
1114+
schemarelids=GetAllSchemaPublicationRelations(pub_elem->oid,
1115+
pub_elem->pubviaroot ?
1116+
PUBLICATION_PART_ROOT :
1117+
PUBLICATION_PART_LEAF);
1118+
pub_elem_tables=list_concat_unique_oid(relids,schemarelids);
1119+
}
1120+
1121+
/*
1122+
* Record the published table and the corresponding publication so
1123+
* that we can get row filters and column lists later.
1124+
*
1125+
* When a table is published by multiple publications, to obtain
1126+
* all row filters and column lists, the structure related to this
1127+
* table will be recorded multiple times.
1128+
*/
1129+
foreach(lc,pub_elem_tables)
1130+
{
1131+
published_rel*table_info= (published_rel*)palloc(sizeof(published_rel));
1132+
1133+
table_info->relid=lfirst_oid(lc);
1134+
table_info->pubid=pub_elem->oid;
1135+
table_infos=lappend(table_infos,table_info);
1136+
}
1137+
1138+
/* At least one publication is using publish_via_partition_root. */
1139+
if (pub_elem->pubviaroot)
1140+
viaroot= true;
10881141
}
10891142

1143+
/*
1144+
* If the publication publishes partition changes via their respective
1145+
* root partitioned tables, we must exclude partitions in favor of
1146+
* including the root partitioned tables. Otherwise, the function
1147+
* could return both the child and parent tables which could cause
1148+
* data of the child table to be double-published on the subscriber
1149+
* side.
1150+
*/
1151+
if (viaroot)
1152+
filter_partitions(table_infos);
1153+
10901154
/* Construct a tuple descriptor for the result rows. */
10911155
tupdesc=CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1092-
TupleDescInitEntry(tupdesc, (AttrNumber)1,"relid",
1156+
TupleDescInitEntry(tupdesc, (AttrNumber)1,"pubid",
10931157
OIDOID,-1,0);
1094-
TupleDescInitEntry(tupdesc, (AttrNumber)2,"attrs",
1158+
TupleDescInitEntry(tupdesc, (AttrNumber)2,"relid",
1159+
OIDOID,-1,0);
1160+
TupleDescInitEntry(tupdesc, (AttrNumber)3,"attrs",
10951161
INT2VECTOROID,-1,0);
1096-
TupleDescInitEntry(tupdesc, (AttrNumber)3,"qual",
1162+
TupleDescInitEntry(tupdesc, (AttrNumber)4,"qual",
10971163
PG_NODE_TREEOID,-1,0);
10981164

10991165
funcctx->tuple_desc=BlessTupleDesc(tupdesc);
1100-
funcctx->user_fctx= (void*)tables;
1166+
funcctx->user_fctx= (void*)table_infos;
11011167

11021168
MemoryContextSwitchTo(oldcontext);
11031169
}
11041170

11051171
/* stuff done on every call of the function */
11061172
funcctx=SRF_PERCALL_SETUP();
1107-
tables= (List*)funcctx->user_fctx;
1173+
table_infos= (List*)funcctx->user_fctx;
11081174

1109-
if (funcctx->call_cntr<list_length(tables))
1175+
if (funcctx->call_cntr<list_length(table_infos))
11101176
{
11111177
HeapTuplepubtuple=NULL;
11121178
HeapTuplerettuple;
1113-
Oidrelid=list_nth_oid(tables,funcctx->call_cntr);
1179+
Publication*pub;
1180+
published_rel*table_info= (published_rel*)list_nth(table_infos,funcctx->call_cntr);
1181+
Oidrelid=table_info->relid;
11141182
Oidschemaid=get_rel_namespace(relid);
11151183
Datumvalues[NUM_PUBLICATION_TABLES_ELEM]= {0};
11161184
boolnulls[NUM_PUBLICATION_TABLES_ELEM]= {0};
@@ -1119,42 +1187,43 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
11191187
* Form tuple with appropriate data.
11201188
*/
11211189

1122-
publication=GetPublicationByName(pubname, false);
1190+
pub=GetPublication(table_info->pubid);
11231191

1124-
values[0]=ObjectIdGetDatum(relid);
1192+
values[0]=ObjectIdGetDatum(pub->oid);
1193+
values[1]=ObjectIdGetDatum(relid);
11251194

11261195
/*
11271196
* We don't consider row filters or column lists for FOR ALL TABLES or
11281197
* FOR TABLES IN SCHEMA publications.
11291198
*/
1130-
if (!publication->alltables&&
1199+
if (!pub->alltables&&
11311200
!SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
11321201
ObjectIdGetDatum(schemaid),
1133-
ObjectIdGetDatum(publication->oid)))
1202+
ObjectIdGetDatum(pub->oid)))
11341203
pubtuple=SearchSysCacheCopy2(PUBLICATIONRELMAP,
11351204
ObjectIdGetDatum(relid),
1136-
ObjectIdGetDatum(publication->oid));
1205+
ObjectIdGetDatum(pub->oid));
11371206

11381207
if (HeapTupleIsValid(pubtuple))
11391208
{
11401209
/* Lookup the column list attribute. */
1141-
values[1]=SysCacheGetAttr(PUBLICATIONRELMAP,pubtuple,
1210+
values[2]=SysCacheGetAttr(PUBLICATIONRELMAP,pubtuple,
11421211
Anum_pg_publication_rel_prattrs,
1143-
&(nulls[1]));
1212+
&(nulls[2]));
11441213

11451214
/* Null indicates no filter. */
1146-
values[2]=SysCacheGetAttr(PUBLICATIONRELMAP,pubtuple,
1215+
values[3]=SysCacheGetAttr(PUBLICATIONRELMAP,pubtuple,
11471216
Anum_pg_publication_rel_prqual,
1148-
&(nulls[2]));
1217+
&(nulls[3]));
11491218
}
11501219
else
11511220
{
1152-
nulls[1]= true;
11531221
nulls[2]= true;
1222+
nulls[3]= true;
11541223
}
11551224

11561225
/* Show all columns when the column list is not specified. */
1157-
if (nulls[1]== true)
1226+
if (nulls[2])
11581227
{
11591228
Relationrel=table_open(relid,AccessShareLock);
11601229
intnattnums=0;
@@ -1176,8 +1245,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
11761245

11771246
if (nattnums>0)
11781247
{
1179-
values[1]=PointerGetDatum(buildint2vector(attnums,nattnums));
1180-
nulls[1]= false;
1248+
values[2]=PointerGetDatum(buildint2vector(attnums,nattnums));
1249+
nulls[2]= false;
11811250
}
11821251

11831252
table_close(rel,AccessShareLock);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp