Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit186c586

Browse files
committed
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation andallowing DROP DATABASE (of a database not involved in the query). Apartfrom explicit dblink_cancel_query() calls, dblink still doesn't cancelthe remote side. The replacement for the blocking calls consists ofnew, general-purpose query execution wrappers in the libpqsrv facility.Out-of-tree extensions should adopt these.The original commitd3c5f37 did notback-patch. Back-patch now to v16-v13, bringing coverage to all supportedversions. This back-patch omits the orignal's refactoring in postgres_fdw.Discussion:https://postgr.es/m/20231122012945.74@rfd.leadboat.com
1 parent5a3d5c0 commit186c586

File tree

3 files changed

+144
-17
lines changed

3 files changed

+144
-17
lines changed

‎contrib/dblink/dblink.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,11 @@
4949
#include"funcapi.h"
5050
#include"lib/stringinfo.h"
5151
#include"libpq-fe.h"
52+
#include"libpq/libpq-be-fe-helpers.h"
5253
#include"mb/pg_wchar.h"
5354
#include"miscadmin.h"
5455
#include"parser/scansup.h"
56+
#include"pgstat.h"
5557
#include"utils/acl.h"
5658
#include"utils/builtins.h"
5759
#include"utils/fmgroids.h"
@@ -479,7 +481,7 @@ dblink_open(PG_FUNCTION_ARGS)
479481
/* If we are not in a transaction, start one */
480482
if (PQtransactionStatus(conn)==PQTRANS_IDLE)
481483
{
482-
res=PQexec(conn,"BEGIN");
484+
res=libpqsrv_exec(conn,"BEGIN",PG_WAIT_EXTENSION);
483485
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
484486
dblink_res_internalerror(conn,res,"begin error");
485487
PQclear(res);
@@ -498,7 +500,7 @@ dblink_open(PG_FUNCTION_ARGS)
498500
(rconn->openCursorCount)++;
499501

500502
appendStringInfo(&buf,"DECLARE %s CURSOR FOR %s",curname,sql);
501-
res=PQexec(conn,buf.data);
503+
res=libpqsrv_exec(conn,buf.data,PG_WAIT_EXTENSION);
502504
if (!res||PQresultStatus(res)!=PGRES_COMMAND_OK)
503505
{
504506
dblink_res_error(conn,conname,res,fail,
@@ -567,7 +569,7 @@ dblink_close(PG_FUNCTION_ARGS)
567569
appendStringInfo(&buf,"CLOSE %s",curname);
568570

569571
/* close the cursor */
570-
res=PQexec(conn,buf.data);
572+
res=libpqsrv_exec(conn,buf.data,PG_WAIT_EXTENSION);
571573
if (!res||PQresultStatus(res)!=PGRES_COMMAND_OK)
572574
{
573575
dblink_res_error(conn,conname,res,fail,
@@ -587,7 +589,7 @@ dblink_close(PG_FUNCTION_ARGS)
587589
{
588590
rconn->newXactForCursor= false;
589591

590-
res=PQexec(conn,"COMMIT");
592+
res=libpqsrv_exec(conn,"COMMIT",PG_WAIT_EXTENSION);
591593
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
592594
dblink_res_internalerror(conn,res,"commit error");
593595
PQclear(res);
@@ -669,7 +671,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
669671
* PGresult will be long-lived even though we are still in a short-lived
670672
* memory context.
671673
*/
672-
res=PQexec(conn,buf.data);
674+
res=libpqsrv_exec(conn,buf.data,PG_WAIT_EXTENSION);
673675
if (!res||
674676
(PQresultStatus(res)!=PGRES_COMMAND_OK&&
675677
PQresultStatus(res)!=PGRES_TUPLES_OK))
@@ -817,7 +819,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
817819
else
818820
{
819821
/* async result retrieval, do it the old way */
820-
PGresult*res=PQgetResult(conn);
822+
PGresult*res=libpqsrv_get_result(conn,PG_WAIT_EXTENSION);
821823

822824
/* NULL means we're all done with the async results */
823825
if (res)
@@ -1131,7 +1133,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
11311133
PQclear(sinfo.last_res);
11321134
PQclear(sinfo.cur_res);
11331135
/* and clear out any pending data in libpq */
1134-
while ((res=PQgetResult(conn))!=NULL)
1136+
while ((res=libpqsrv_get_result(conn,PG_WAIT_EXTENSION))!=
1137+
NULL)
11351138
PQclear(res);
11361139
PG_RE_THROW();
11371140
}
@@ -1158,7 +1161,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
11581161
{
11591162
CHECK_FOR_INTERRUPTS();
11601163

1161-
sinfo->cur_res=PQgetResult(conn);
1164+
sinfo->cur_res=libpqsrv_get_result(conn,PG_WAIT_EXTENSION);
11621165
if (!sinfo->cur_res)
11631166
break;
11641167

@@ -1486,7 +1489,7 @@ dblink_exec(PG_FUNCTION_ARGS)
14861489
if (!conn)
14871490
dblink_conn_not_avail(conname);
14881491

1489-
res=PQexec(conn,sql);
1492+
res=libpqsrv_exec(conn,sql,PG_WAIT_EXTENSION);
14901493
if (!res||
14911494
(PQresultStatus(res)!=PGRES_COMMAND_OK&&
14921495
PQresultStatus(res)!=PGRES_TUPLES_OK))
@@ -2771,8 +2774,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
27712774

27722775
/*
27732776
* If we don't get a message from the PGresult, try the PGconn. This is
2774-
* needed because for connection-level failures,PQexec may just return
2775-
* NULL, not a PGresult at all.
2777+
* needed because for connection-level failures,PQgetResult may just
2778+
*returnNULL, not a PGresult at all.
27762779
*/
27772780
if (message_primary==NULL)
27782781
message_primary=pchomp(PQerrorMessage(conn));

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -603,12 +603,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
603603
* Send a query and wait for the results by using the asynchronous libpq
604604
* functions and socket readiness events.
605605
*
606-
* We must not use the regular blocking libpq functions like PQexec()
607-
* since they are uninterruptible by signals on some platforms, such as
608-
* Windows.
609-
*
610-
* The function is modeled on PQexec() in libpq, but only implements
611-
* those parts that are in use in the walreceiver api.
606+
* The function is modeled on libpqsrv_exec(), with the behavior difference
607+
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
608+
* skips try/catch, since all errors terminate the process.
612609
*
613610
* May return NULL, rather than an error result, on failure.
614611
*/

‎src/include/libpq/libpq-be-fe-helpers.h

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949

5050
staticinlinevoidlibpqsrv_connect_prepare(void);
5151
staticinlinevoidlibpqsrv_connect_internal(PGconn*conn,uint32wait_event_info);
52+
staticinlinePGresult*libpqsrv_get_result_last(PGconn*conn,uint32wait_event_info);
53+
staticinlinePGresult*libpqsrv_get_result(PGconn*conn,uint32wait_event_info);
5254

5355

5456
/*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
239241
PG_END_TRY();
240242
}
241243

244+
/*
245+
* PQexec() wrapper that processes interrupts.
246+
*
247+
* Unless PQsetnonblocking(conn, 1) is in effect, this can't process
248+
* interrupts while pushing the query text to the server. Consider that
249+
* setting if query strings can be long relative to TCP buffer size.
250+
*
251+
* This has the preconditions of PQsendQuery(), not those of PQexec(). Most
252+
* notably, PQexec() would silently discard any prior query results.
253+
*/
254+
staticinlinePGresult*
255+
libpqsrv_exec(PGconn*conn,constchar*query,uint32wait_event_info)
256+
{
257+
if (!PQsendQuery(conn,query))
258+
returnNULL;
259+
returnlibpqsrv_get_result_last(conn,wait_event_info);
260+
}
261+
262+
/*
263+
* PQexecParams() wrapper that processes interrupts.
264+
*
265+
* See notes at libpqsrv_exec().
266+
*/
267+
staticinlinePGresult*
268+
libpqsrv_exec_params(PGconn*conn,
269+
constchar*command,
270+
intnParams,
271+
constOid*paramTypes,
272+
constchar*const*paramValues,
273+
constint*paramLengths,
274+
constint*paramFormats,
275+
intresultFormat,
276+
uint32wait_event_info)
277+
{
278+
if (!PQsendQueryParams(conn,command,nParams,paramTypes,paramValues,
279+
paramLengths,paramFormats,resultFormat))
280+
returnNULL;
281+
returnlibpqsrv_get_result_last(conn,wait_event_info);
282+
}
283+
284+
/*
285+
* Like PQexec(), loop over PQgetResult() until it returns NULL or another
286+
* terminal state. Return the last non-NULL result or the terminal state.
287+
*/
288+
staticinlinePGresult*
289+
libpqsrv_get_result_last(PGconn*conn,uint32wait_event_info)
290+
{
291+
PGresult*volatilelastResult=NULL;
292+
293+
/* In what follows, do not leak any PGresults on an error. */
294+
PG_TRY();
295+
{
296+
for (;;)
297+
{
298+
/* Wait for, and collect, the next PGresult. */
299+
PGresult*result;
300+
301+
result=libpqsrv_get_result(conn,wait_event_info);
302+
if (result==NULL)
303+
break;/* query is complete, or failure */
304+
305+
/*
306+
* Emulate PQexec()'s behavior of returning the last result when
307+
* there are many.
308+
*/
309+
PQclear(lastResult);
310+
lastResult=result;
311+
312+
if (PQresultStatus(lastResult)==PGRES_COPY_IN||
313+
PQresultStatus(lastResult)==PGRES_COPY_OUT||
314+
PQresultStatus(lastResult)==PGRES_COPY_BOTH||
315+
PQstatus(conn)==CONNECTION_BAD)
316+
break;
317+
}
318+
}
319+
PG_CATCH();
320+
{
321+
PQclear(lastResult);
322+
PG_RE_THROW();
323+
}
324+
PG_END_TRY();
325+
326+
returnlastResult;
327+
}
328+
329+
/*
330+
* Perform the equivalent of PQgetResult(), but watch for interrupts.
331+
*/
332+
staticinlinePGresult*
333+
libpqsrv_get_result(PGconn*conn,uint32wait_event_info)
334+
{
335+
/*
336+
* Collect data until PQgetResult is ready to get the result without
337+
* blocking.
338+
*/
339+
while (PQisBusy(conn))
340+
{
341+
intrc;
342+
343+
rc=WaitLatchOrSocket(MyLatch,
344+
WL_EXIT_ON_PM_DEATH |WL_LATCH_SET |
345+
WL_SOCKET_READABLE,
346+
PQsocket(conn),
347+
0,
348+
wait_event_info);
349+
350+
/* Interrupted? */
351+
if (rc&WL_LATCH_SET)
352+
{
353+
ResetLatch(MyLatch);
354+
CHECK_FOR_INTERRUPTS();
355+
}
356+
357+
/* Consume whatever data is available from the socket */
358+
if (PQconsumeInput(conn)==0)
359+
{
360+
/* trouble; expect PQgetResult() to return NULL */
361+
break;
362+
}
363+
}
364+
365+
/* Now we can collect and return the next PGresult */
366+
returnPQgetResult(conn);
367+
}
368+
242369
#endif/* LIBPQ_BE_FE_HELPERS_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp