Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8f2e2bb

Browse files
author
Amit Kapila
committed
Raise a WARNING for missing publications.
When we create or alter a subscription to add publications raise a warningfor non-existent publications. We don't want to give an error here becauseit is possible that users can later create the missing publications.Author: Vignesh CReviewed-by: Bharath Rupireddy, Japin Li, Dilip Kumar, Euler Taveira, Ashutosh Sharma, Amit KapilaDiscussion:https://postgr.es/m/CALDaNm0f4YujGW+q-Di0CbZpnQKFFrXntikaQQKuEmGG0=Zw=Q@mail.gmail.com
1 parent8ac4c25 commit8f2e2bb

File tree

4 files changed

+160
-21
lines changed

4 files changed

+160
-21
lines changed

‎doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
114114
replaces the entire list of publications with a new list,
115115
<literal>ADD</literal> adds additional publications to the list of
116116
publications, and <literal>DROP</literal> removes the publications from
117-
the list of publications. See <xref linkend="sql-createsubscription"/>
117+
the list of publications. We allow non-existent publications to be
118+
specified in <literal>ADD</literal> and <literal>SET</literal> variants
119+
so that users can add those later. See <xref linkend="sql-createsubscription"/>
118120
for more information. By default, this command will also act like
119121
<literal>REFRESH PUBLICATION</literal>.
120122
</para>

‎doc/src/sgml/ref/create_subscription.sgml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
356356
copied data that would be incompatible with subsequent filtering.
357357
</para>
358358

359+
<para>
360+
We allow non-existent publications to be specified so that users can add
361+
those later. This means
362+
<link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
363+
can have non-existent publications.
364+
</para>
365+
359366
</refsect1>
360367

361368
<refsect1>

‎src/backend/commands/subscriptioncmds.c

Lines changed: 113 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
375375
}
376376
}
377377

378+
/*
379+
* Add publication names from the list to a string.
380+
*/
381+
staticvoid
382+
get_publications_str(List*publications,StringInfodest,boolquote_literal)
383+
{
384+
ListCell*lc;
385+
boolfirst= true;
386+
387+
Assert(list_length(publications)>0);
388+
389+
foreach(lc,publications)
390+
{
391+
char*pubname=strVal(lfirst(lc));
392+
393+
if (first)
394+
first= false;
395+
else
396+
appendStringInfoString(dest,", ");
397+
398+
if (quote_literal)
399+
appendStringInfoString(dest,quote_literal_cstr(pubname));
400+
else
401+
{
402+
appendStringInfoChar(dest,'"');
403+
appendStringInfoString(dest,pubname);
404+
appendStringInfoChar(dest,'"');
405+
}
406+
}
407+
}
408+
409+
/*
410+
* Check the specified publication(s) is(are) present in the publisher.
411+
*/
412+
staticvoid
413+
check_publications(WalReceiverConn*wrconn,List*publications)
414+
{
415+
WalRcvExecResult*res;
416+
StringInfocmd;
417+
TupleTableSlot*slot;
418+
List*publicationsCopy=NIL;
419+
OidtableRow[1]= {TEXTOID};
420+
421+
cmd=makeStringInfo();
422+
appendStringInfoString(cmd,"SELECT t.pubname FROM\n"
423+
" pg_catalog.pg_publication t WHERE\n"
424+
" t.pubname IN (");
425+
get_publications_str(publications,cmd, true);
426+
appendStringInfoChar(cmd,')');
427+
428+
res=walrcv_exec(wrconn,cmd->data,1,tableRow);
429+
pfree(cmd->data);
430+
pfree(cmd);
431+
432+
if (res->status!=WALRCV_OK_TUPLES)
433+
ereport(ERROR,
434+
errmsg_plural("could not receive publication from the publisher: %s",
435+
"could not receive list of publications from the publisher: %s",
436+
list_length(publications),
437+
res->err));
438+
439+
publicationsCopy=list_copy(publications);
440+
441+
/* Process publication(s). */
442+
slot=MakeSingleTupleTableSlot(res->tupledesc,&TTSOpsMinimalTuple);
443+
while (tuplestore_gettupleslot(res->tuplestore, true, false,slot))
444+
{
445+
char*pubname;
446+
boolisnull;
447+
448+
pubname=TextDatumGetCString(slot_getattr(slot,1,&isnull));
449+
Assert(!isnull);
450+
451+
/* Delete the publication present in publisher from the list. */
452+
publicationsCopy=list_delete(publicationsCopy,makeString(pubname));
453+
ExecClearTuple(slot);
454+
}
455+
456+
ExecDropSingleTupleTableSlot(slot);
457+
458+
walrcv_clear_result(res);
459+
460+
if (list_length(publicationsCopy))
461+
{
462+
/* Prepare the list of non-existent publication(s) for error message. */
463+
StringInfopubnames=makeStringInfo();
464+
465+
get_publications_str(publicationsCopy,pubnames, false);
466+
ereport(WARNING,
467+
errcode(ERRCODE_UNDEFINED_OBJECT),
468+
errmsg_plural("publication %s does not exist in the publisher",
469+
"publications %s do not exist in the publisher",
470+
list_length(publicationsCopy),
471+
pubnames->data));
472+
}
473+
}
474+
378475
/*
379476
* Auxiliary function to build a text array out of a list of String nodes.
380477
*/
@@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
555652

556653
PG_TRY();
557654
{
655+
check_publications(wrconn,publications);
656+
558657
/*
559658
* Set sync state based on if we were asked to do data copy or
560659
* not.
@@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
650749
}
651750

652751
staticvoid
653-
AlterSubscription_refresh(Subscription*sub,boolcopy_data)
752+
AlterSubscription_refresh(Subscription*sub,boolcopy_data,
753+
List*validate_publications)
654754
{
655755
char*err;
656756
List*pubrel_names;
@@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
681781

682782
PG_TRY();
683783
{
784+
if (validate_publications)
785+
check_publications(wrconn,validate_publications);
786+
684787
/* Get the list of relations from publisher. */
685788
pubrel_names=fetch_table_list(wrconn,sub->publications);
686789
pubrel_names=list_concat(pubrel_names,
@@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10481151
/* Make sure refresh sees the new list of publications. */
10491152
sub->publications=stmt->publication;
10501153

1051-
AlterSubscription_refresh(sub,opts.copy_data);
1154+
AlterSubscription_refresh(sub,opts.copy_data,
1155+
stmt->publication);
10521156
}
10531157

10541158
break;
@@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10741178
/* Refresh if user asked us to. */
10751179
if (opts.refresh)
10761180
{
1181+
/* We only need to validate user specified publications. */
1182+
List*validate_publications= (isadd) ?stmt->publication :NULL;
1183+
10771184
if (!sub->enabled)
10781185
ereport(ERROR,
10791186
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10961203
/* Refresh the new list of publications. */
10971204
sub->publications=publist;
10981205

1099-
AlterSubscription_refresh(sub,opts.copy_data);
1206+
AlterSubscription_refresh(sub,opts.copy_data,
1207+
validate_publications);
11001208
}
11011209

11021210
break;
@@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
11381246

11391247
PreventInTransactionBlock(isTopLevel,"ALTER SUBSCRIPTION ... REFRESH");
11401248

1141-
AlterSubscription_refresh(sub,opts.copy_data);
1249+
AlterSubscription_refresh(sub,opts.copy_data,NULL);
11421250

11431251
break;
11441252
}
@@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
16591767
StringInfoDatacmd;
16601768
TupleTableSlot*slot;
16611769
OidtableRow[2]= {TEXTOID,TEXTOID};
1662-
ListCell*lc;
1663-
boolfirst;
16641770
List*tablelist=NIL;
16651771

1666-
Assert(list_length(publications)>0);
1667-
16681772
initStringInfo(&cmd);
16691773
appendStringInfoString(&cmd,"SELECT DISTINCT t.schemaname, t.tablename\n"
16701774
" FROM pg_catalog.pg_publication_tables t\n"
16711775
" WHERE t.pubname IN (");
1672-
first= true;
1673-
foreach(lc,publications)
1674-
{
1675-
char*pubname=strVal(lfirst(lc));
1676-
1677-
if (first)
1678-
first= false;
1679-
else
1680-
appendStringInfoString(&cmd,", ");
1681-
1682-
appendStringInfoString(&cmd,quote_literal_cstr(pubname));
1683-
}
1776+
get_publications_str(publications,&cmd, true);
16841777
appendStringInfoChar(&cmd,')');
16851778

16861779
res=walrcv_exec(wrconn,cmd.data,2,tableRow);

‎src/test/subscription/t/007_ddl.pl

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,43 @@
4141

4242
pass"subscription disable and drop in same transaction did not hang";
4343

44+
# One of the specified publications exists.
45+
my ($ret,$stdout,$stderr) =$node_subscriber->psql('postgres',
46+
"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub"
47+
);
48+
ok($stderr =~
49+
m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
50+
"Create subscription throws warning for non-existent publication");
51+
52+
$node_publisher->wait_for_catchup('mysub1');
53+
54+
# Also wait for initial table sync to finish.
55+
my$synced_query =
56+
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
57+
$node_subscriber->poll_query_until('postgres',$synced_query)
58+
ordie"Timed out while waiting for subscriber to synchronize data";
59+
60+
# Also wait for initial table sync to finish.
61+
$node_subscriber->poll_query_until('postgres',$synced_query)
62+
ordie"Timed out while waiting for subscriber to synchronize data";
63+
64+
# Specifying non-existent publication along with add publication.
65+
($ret,$stdout,$stderr) =$node_subscriber->psql(
66+
'postgres',
67+
"ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub1, non_existent_pub2"
68+
);
69+
ok($stderr =~
70+
m/WARNING: publications "non_existent_pub1", "non_existent_pub2" do not exist in the publisher/,
71+
"Alter subscription add publication throws warning for non-existent publications");
72+
73+
# Specifying non-existent publication along with set publication.
74+
($ret,$stdout,$stderr) =$node_subscriber->psql('postgres',
75+
"ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub"
76+
);
77+
ok($stderr =~
78+
m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
79+
"Alter subscription set publication throws warning for non-existent publication");
80+
4481
$node_subscriber->stop;
4582
$node_publisher->stop;
4683

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp