Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit03a571a

Browse files
committed
Add wrapper function libpqrcv_PQexec() in the walreceiver that uses async
libpq to send queries, making the waiting for responses interruptible onplatforms where PQexec() can't normally be interrupted by signals, suchas win32.Fujii Masao and Magnus Hagander
1 parent5b89ef3 commit03a571a

File tree

2 files changed

+88
-6
lines changed

2 files changed

+88
-6
lines changed

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
*
1111
*
1212
* IDENTIFICATION
13-
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $
13+
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
1414
*
1515
*-------------------------------------------------------------------------
1616
*/
@@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void);
5454

5555
/* Prototypes for private functions */
5656
staticboollibpq_select(inttimeout_ms);
57+
staticPGresult*libpqrcv_PQexec(constchar*query);
5758

5859
/*
5960
* Module load callback
@@ -97,7 +98,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
9798
* Get the system identifier and timeline ID as a DataRow message from the
9899
* primary server.
99100
*/
100-
res=PQexec(streamConn,"IDENTIFY_SYSTEM");
101+
res=libpqrcv_PQexec("IDENTIFY_SYSTEM");
101102
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
102103
{
103104
PQclear(res);
@@ -149,11 +150,14 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
149150
/* Start streaming from the point requested by startup process */
150151
snprintf(cmd,sizeof(cmd),"START_REPLICATION %X/%X",
151152
startpoint.xlogid,startpoint.xrecoff);
152-
res=PQexec(streamConn,cmd);
153+
res=libpqrcv_PQexec(cmd);
153154
if (PQresultStatus(res)!=PGRES_COPY_OUT)
155+
{
156+
PQclear(res);
154157
ereport(ERROR,
155158
(errmsg("could not start WAL streaming: %s",
156159
PQerrorMessage(streamConn))));
160+
}
157161
PQclear(res);
158162

159163
justconnected= true;
@@ -224,6 +228,84 @@ libpq_select(int timeout_ms)
224228
return true;
225229
}
226230

231+
/*
232+
* Send a query and wait for the results by using the asynchronous libpq
233+
* functions and the backend version of select().
234+
*
235+
* We must not use the regular blocking libpq functions like PQexec()
236+
* since they are uninterruptible by signals on some platforms, such as
237+
* Windows.
238+
*
239+
* We must also not use vanilla select() here since it cannot handle the
240+
* signal emulation layer on Windows.
241+
*
242+
* The function is modeled on PQexec() in libpq, but only implements
243+
* those parts that are in use in the walreceiver.
244+
*
245+
* Queries are always executed on the connection in streamConn.
246+
*/
247+
staticPGresult*
248+
libpqrcv_PQexec(constchar*query)
249+
{
250+
PGresult*result=NULL;
251+
PGresult*lastResult=NULL;
252+
253+
/*
254+
* PQexec() silently discards any prior query results on the
255+
* connection. This is not required for walreceiver since it's
256+
* expected that walsender won't generate any such junk results.
257+
*/
258+
259+
/*
260+
* Submit a query. Since we don't use non-blocking mode, this also
261+
* can block. But its risk is relatively small, so we ignore that
262+
* for now.
263+
*/
264+
if (!PQsendQuery(streamConn,query))
265+
returnNULL;
266+
267+
for (;;)
268+
{
269+
/*
270+
* Receive data until PQgetResult is ready to get the result
271+
* without blocking.
272+
*/
273+
while (PQisBusy(streamConn))
274+
{
275+
/*
276+
* We don't need to break down the sleep into smaller increments,
277+
* and check for interrupts after each nap, since we can just
278+
* elog(FATAL) within SIGTERM signal handler if the signal
279+
* arrives in the middle of establishment of replication connection.
280+
*/
281+
if (!libpq_select(-1))
282+
continue;/* interrupted */
283+
if (PQconsumeInput(streamConn)==0)
284+
returnNULL;/* trouble */
285+
}
286+
287+
/*
288+
* Emulate the PQexec()'s behavior of returning the last result
289+
* when there are many.
290+
* Since walsender will never generate multiple results, we skip
291+
* the concatenation of error messages.
292+
*/
293+
result=PQgetResult(streamConn);
294+
if (result==NULL)
295+
break;/* query is complete */
296+
297+
PQclear(lastResult);
298+
lastResult=result;
299+
300+
if (PQresultStatus(lastResult)==PGRES_COPY_IN||
301+
PQresultStatus(lastResult)==PGRES_COPY_OUT||
302+
PQstatus(streamConn)==CONNECTION_BAD)
303+
break;
304+
}
305+
306+
returnlastResult;
307+
}
308+
227309
/*
228310
* Disconnect connection to primary, if any.
229311
*/

‎src/backend/replication/walreceiver.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*
3030
*
3131
* IDENTIFICATION
32-
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.8 2010/04/13 08:16:09 mha Exp $
32+
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
3333
*
3434
*-------------------------------------------------------------------------
3535
*/
@@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void);
8686
* We can't just exit(1) within SIGTERM signal handler, because the signal
8787
* might arrive in the middle of some critical operation, like while we're
8888
* holding a spinlock. We also can't just set a flag in signal handler and
89-
* check it in the main loop, because we perform some blockinglibpq
90-
*operationslikePQexec(), which can take a long time to finish.
89+
* check it in the main loop, because we perform some blockingoperations
90+
* likelibpqrcv_PQexec(), which can take a long time to finish.
9191
*
9292
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
9393
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp