Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdafbfed

Browse files
author
Amit Kapila
committed
Enhance libpqrcv APIs to support slot synchronization.
This patch provides support for regular (non-replication) connections inlibpqrcv_connect(). This can be used to execute SQL statements on theprimary server without starting a walsender.A new API libpqrcv_get_dbname_from_conninfo() is also added to extract thedatabase name from the given connection-info.Note that this patch doesn't change any existing functionality but laterpatches implementing the slot synchronization will use this functionalityto connect to the primary server to fetch required slot information.Author: Shveta Malik, Hou Zhijie, Ajin CherianReviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda Hayato, Amit KapilaDiscussion:https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
1 parenta17aa50 commitdafbfed

File tree

6 files changed

+114
-40
lines changed

6 files changed

+114
-40
lines changed

‎src/backend/commands/subscriptioncmds.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
759759

760760
/* Try to connect to the publisher. */
761761
must_use_password= !superuser_arg(owner)&&opts.passwordrequired;
762-
wrconn=walrcv_connect(conninfo, true,must_use_password,
762+
wrconn=walrcv_connect(conninfo, true,true,must_use_password,
763763
stmt->subname,&err);
764764
if (!wrconn)
765765
ereport(ERROR,
@@ -910,7 +910,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
910910

911911
/* Try to connect to the publisher. */
912912
must_use_password=sub->passwordrequired&& !sub->ownersuperuser;
913-
wrconn=walrcv_connect(sub->conninfo, true,must_use_password,
913+
wrconn=walrcv_connect(sub->conninfo, true,true,must_use_password,
914914
sub->name,&err);
915915
if (!wrconn)
916916
ereport(ERROR,
@@ -1537,7 +1537,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
15371537

15381538
/* Try to connect to the publisher. */
15391539
must_use_password=sub->passwordrequired&& !sub->ownersuperuser;
1540-
wrconn=walrcv_connect(sub->conninfo, true,must_use_password,
1540+
wrconn=walrcv_connect(sub->conninfo, true,true,must_use_password,
15411541
sub->name,&err);
15421542
if (!wrconn)
15431543
ereport(ERROR,
@@ -1788,7 +1788,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
17881788
*/
17891789
load_file("libpqwalreceiver", false);
17901790

1791-
wrconn=walrcv_connect(conninfo, true,must_use_password,
1791+
wrconn=walrcv_connect(conninfo, true,true,must_use_password,
17921792
subname,&err);
17931793
if (wrconn==NULL)
17941794
{

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 90 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ struct WalReceiverConn
4848

4949
/* Prototypes for interface functions */
5050
staticWalReceiverConn*libpqrcv_connect(constchar*conninfo,
51-
boollogical,boolmust_use_password,
51+
boolreplication,boollogical,
52+
boolmust_use_password,
5253
constchar*appname,char**err);
5354
staticvoidlibpqrcv_check_conninfo(constchar*conninfo,
5455
boolmust_use_password);
@@ -57,6 +58,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
5758
char**sender_host,int*sender_port);
5859
staticchar*libpqrcv_identify_system(WalReceiverConn*conn,
5960
TimeLineID*primary_tli);
61+
staticchar*libpqrcv_get_dbname_from_conninfo(constchar*conninfo);
6062
staticintlibpqrcv_server_version(WalReceiverConn*conn);
6163
staticvoidlibpqrcv_readtimelinehistoryfile(WalReceiverConn*conn,
6264
TimeLineIDtli,char**filename,
@@ -99,6 +101,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
99101
.walrcv_send=libpqrcv_send,
100102
.walrcv_create_slot=libpqrcv_create_slot,
101103
.walrcv_alter_slot=libpqrcv_alter_slot,
104+
.walrcv_get_dbname_from_conninfo=libpqrcv_get_dbname_from_conninfo,
102105
.walrcv_get_backend_pid=libpqrcv_get_backend_pid,
103106
.walrcv_exec=libpqrcv_exec,
104107
.walrcv_disconnect=libpqrcv_disconnect
@@ -121,7 +124,11 @@ _PG_init(void)
121124
}
122125

123126
/*
124-
* Establish the connection to the primary server for XLOG streaming
127+
* Establish the connection to the primary server.
128+
*
129+
* This function can be used for both replication and regular connections.
130+
* If it is a replication connection, it could be either logical or physical
131+
* based on input argument 'logical'.
125132
*
126133
* If an error occurs, this function will normally return NULL and set *err
127134
* to a palloc'ed error message. However, if must_use_password is true and
@@ -132,8 +139,8 @@ _PG_init(void)
132139
* case.
133140
*/
134141
staticWalReceiverConn*
135-
libpqrcv_connect(constchar*conninfo,boollogical,boolmust_use_password,
136-
constchar*appname,char**err)
142+
libpqrcv_connect(constchar*conninfo,boolreplication,boollogical,
143+
boolmust_use_password,constchar*appname,char**err)
137144
{
138145
WalReceiverConn*conn;
139146
PostgresPollingStatusTypestatus;
@@ -156,36 +163,46 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
156163
*/
157164
keys[i]="dbname";
158165
vals[i]=conninfo;
159-
keys[++i]="replication";
160-
vals[i]=logical ?"database" :"true";
161-
if (!logical)
166+
167+
/* We can not have logical without replication */
168+
Assert(replication|| !logical);
169+
170+
if (replication)
162171
{
163-
/*
164-
* The database name is ignored by the server in replication mode, but
165-
* specify "replication" for .pgpass lookup.
166-
*/
167-
keys[++i]="dbname";
168-
vals[i]="replication";
172+
keys[++i]="replication";
173+
vals[i]=logical ?"database" :"true";
174+
175+
if (logical)
176+
{
177+
/* Tell the publisher to translate to our encoding */
178+
keys[++i]="client_encoding";
179+
vals[i]=GetDatabaseEncodingName();
180+
181+
/*
182+
* Force assorted GUC parameters to settings that ensure that the
183+
* publisher will output data values in a form that is unambiguous
184+
* to the subscriber. (We don't want to modify the subscriber's
185+
* GUC settings, since that might surprise user-defined code
186+
* running in the subscriber, such as triggers.) This should
187+
* match what pg_dump does.
188+
*/
189+
keys[++i]="options";
190+
vals[i]="-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
191+
}
192+
else
193+
{
194+
/*
195+
* The database name is ignored by the server in replication mode,
196+
* but specify "replication" for .pgpass lookup.
197+
*/
198+
keys[++i]="dbname";
199+
vals[i]="replication";
200+
}
169201
}
202+
170203
keys[++i]="fallback_application_name";
171204
vals[i]=appname;
172-
if (logical)
173-
{
174-
/* Tell the publisher to translate to our encoding */
175-
keys[++i]="client_encoding";
176-
vals[i]=GetDatabaseEncodingName();
177205

178-
/*
179-
* Force assorted GUC parameters to settings that ensure that the
180-
* publisher will output data values in a form that is unambiguous to
181-
* the subscriber. (We don't want to modify the subscriber's GUC
182-
* settings, since that might surprise user-defined code running in
183-
* the subscriber, such as triggers.) This should match what pg_dump
184-
* does.
185-
*/
186-
keys[++i]="options";
187-
vals[i]="-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
188-
}
189206
keys[++i]=NULL;
190207
vals[i]=NULL;
191208

@@ -471,6 +488,50 @@ libpqrcv_server_version(WalReceiverConn *conn)
471488
returnPQserverVersion(conn->streamConn);
472489
}
473490

491+
/*
492+
* Get database name from the primary server's conninfo.
493+
*
494+
* If dbname is not found in connInfo, return NULL value.
495+
*/
496+
staticchar*
497+
libpqrcv_get_dbname_from_conninfo(constchar*connInfo)
498+
{
499+
PQconninfoOption*opts;
500+
char*dbname=NULL;
501+
char*err=NULL;
502+
503+
opts=PQconninfoParse(connInfo,&err);
504+
if (opts==NULL)
505+
{
506+
/* The error string is malloc'd, so we must free it explicitly */
507+
char*errcopy=err ?pstrdup(err) :"out of memory";
508+
509+
PQfreemem(err);
510+
ereport(ERROR,
511+
(errcode(ERRCODE_SYNTAX_ERROR),
512+
errmsg("invalid connection string syntax: %s",errcopy)));
513+
}
514+
515+
for (PQconninfoOption*opt=opts;opt->keyword!=NULL;++opt)
516+
{
517+
/*
518+
* If multiple dbnames are specified, then the last one will be
519+
* returned
520+
*/
521+
if (strcmp(opt->keyword,"dbname")==0&&opt->val&&
522+
*opt->val)
523+
{
524+
if (dbname)
525+
pfree(dbname);
526+
527+
dbname=pstrdup(opt->val);
528+
}
529+
}
530+
531+
PQconninfoFree(opts);
532+
returndbname;
533+
}
534+
474535
/*
475536
* Start streaming WAL data from given streaming options.
476537
*

‎src/backend/replication/logical/tablesync.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
13291329
* so that synchronous replication can distinguish them.
13301330
*/
13311331
LogRepWorkerWalRcvConn=
1332-
walrcv_connect(MySubscription->conninfo, true,
1332+
walrcv_connect(MySubscription->conninfo, true, true,
13331333
must_use_password,
13341334
slotname,&err);
13351335
if (LogRepWorkerWalRcvConn==NULL)

‎src/backend/replication/logical/worker.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4519,7 +4519,7 @@ run_apply_worker()
45194519
!MySubscription->ownersuperuser;
45204520

45214521
LogRepWorkerWalRcvConn=walrcv_connect(MySubscription->conninfo, true,
4522-
must_use_password,
4522+
true,must_use_password,
45234523
MySubscription->name,&err);
45244524

45254525
if (LogRepWorkerWalRcvConn==NULL)

‎src/backend/replication/walreceiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ WalReceiverMain(void)
296296
sigprocmask(SIG_SETMASK,&UnBlockSig,NULL);
297297

298298
/* Establish the connection to the primary for XLOG streaming */
299-
wrconn=walrcv_connect(conninfo, false, false,
299+
wrconn=walrcv_connect(conninfo,true,false, false,
300300
cluster_name[0] ?cluster_name :"walreceiver",
301301
&err);
302302
if (!wrconn)

‎src/include/replication/walreceiver.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,18 @@ typedef struct WalRcvExecResult
228228
/*
229229
* walrcv_connect_fn
230230
*
231-
* Establish connection to a cluster. 'logical' is true if the
232-
* connection is logical, and false if the connection is physical.
231+
* Establish connection to a cluster. 'replication' is true if the
232+
* connection is a replication connection, and false if it is a
233+
* regular connection. If it is a replication connection, it could
234+
* be either logical or physical based on input argument 'logical'.
233235
* 'appname' is a name associated to the connection, to use for example
234236
* with fallback_application_name or application_name. Returns the
235237
* details about the connection established, as defined by
236238
* WalReceiverConn for each WAL receiver module. On error, NULL is
237239
* returned with 'err' including the error generated.
238240
*/
239241
typedefWalReceiverConn*(*walrcv_connect_fn) (constchar*conninfo,
242+
boolreplication,
240243
boollogical,
241244
boolmust_use_password,
242245
constchar*appname,
@@ -279,6 +282,13 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
279282
typedefchar*(*walrcv_identify_system_fn) (WalReceiverConn*conn,
280283
TimeLineID*primary_tli);
281284

285+
/*
286+
* walrcv_get_dbname_from_conninfo_fn
287+
*
288+
* Returns the database name from the primary_conninfo
289+
*/
290+
typedefchar*(*walrcv_get_dbname_from_conninfo_fn) (constchar*conninfo);
291+
282292
/*
283293
* walrcv_server_version_fn
284294
*
@@ -403,6 +413,7 @@ typedef struct WalReceiverFunctionsType
403413
walrcv_get_conninfo_fnwalrcv_get_conninfo;
404414
walrcv_get_senderinfo_fnwalrcv_get_senderinfo;
405415
walrcv_identify_system_fnwalrcv_identify_system;
416+
walrcv_get_dbname_from_conninfo_fnwalrcv_get_dbname_from_conninfo;
406417
walrcv_server_version_fnwalrcv_server_version;
407418
walrcv_readtimelinehistoryfile_fnwalrcv_readtimelinehistoryfile;
408419
walrcv_startstreaming_fnwalrcv_startstreaming;
@@ -418,8 +429,8 @@ typedef struct WalReceiverFunctionsType
418429

419430
externPGDLLIMPORTWalReceiverFunctionsType*WalReceiverFunctions;
420431

421-
#definewalrcv_connect(conninfo,logical,must_use_password,appname,err) \
422-
WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err)
432+
#definewalrcv_connect(conninfo,replication,logical,must_use_password,appname,err) \
433+
WalReceiverFunctions->walrcv_connect(conninfo,replication,logical, must_use_password, appname, err)
423434
#definewalrcv_check_conninfo(conninfo,must_use_password) \
424435
WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
425436
#definewalrcv_get_conninfo(conn) \
@@ -428,6 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
428439
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
429440
#definewalrcv_identify_system(conn,primary_tli) \
430441
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
442+
#definewalrcv_get_dbname_from_conninfo(conninfo) \
443+
WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
431444
#definewalrcv_server_version(conn) \
432445
WalReceiverFunctions->walrcv_server_version(conn)
433446
#definewalrcv_readtimelinehistoryfile(conn,tli,filename,content,size) \

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp