Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2466d66

Browse files
committed
libpq-be-fe-helpers.h: wrap new cancel APIs
Commit61461a3 introduced new functions to libpq for cancellingqueries. This commit introduces a helper function that backend-sidelibraries and extensions can use to invoke those. This function takes atimeout and can itself be interrupted while it is waiting for a cancelrequest to be sent and processed, instead of being blocked.This replaces the usage of the old functions in postgres_fdw and dblink.Finally, it also adds some test coverage for the cancel support inpostgres_fdw.Author: Jelte Fennema-Nio <postgres@jeltef.nl>Discussion:https://postgr.es/m/CAGECzQT_VgOWWENUqvUV9xQmbaCyXjtRRAYO8W07oqashk_N+g@mail.gmail.com
1 parent4270057 commit2466d66

File tree

5 files changed

+140
-39
lines changed

5 files changed

+140
-39
lines changed

‎contrib/dblink/dblink.c

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,25 +1347,16 @@ Datum
13471347
dblink_cancel_query(PG_FUNCTION_ARGS)
13481348
{
13491349
PGconn*conn;
1350-
PGcancelConn*cancelConn;
13511350
char*msg;
1351+
TimestampTzendtime;
13521352

13531353
dblink_init();
13541354
conn=dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1355-
cancelConn=PQcancelCreate(conn);
1356-
1357-
PG_TRY();
1358-
{
1359-
if (!PQcancelBlocking(cancelConn))
1360-
msg=pchomp(PQcancelErrorMessage(cancelConn));
1361-
else
1362-
msg="OK";
1363-
}
1364-
PG_FINALLY();
1365-
{
1366-
PQcancelFinish(cancelConn);
1367-
}
1368-
PG_END_TRY();
1355+
endtime=TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1356+
30000);
1357+
msg=libpqsrv_cancel(conn,endtime);
1358+
if (msg==NULL)
1359+
msg="OK";
13691360

13701361
PG_RETURN_TEXT_P(cstring_to_text(msg));
13711362
}

‎contrib/postgres_fdw/connection.c

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
133133
staticvoidpgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry);
134134
staticvoidpgfdw_reset_xact_state(ConnCacheEntry*entry,booltoplevel);
135135
staticboolpgfdw_cancel_query(PGconn*conn);
136-
staticboolpgfdw_cancel_query_begin(PGconn*conn);
136+
staticboolpgfdw_cancel_query_begin(PGconn*conn,TimestampTzendtime);
137137
staticboolpgfdw_cancel_query_end(PGconn*conn,TimestampTzendtime,
138138
boolconsume_input);
139139
staticboolpgfdw_exec_cleanup_query(PGconn*conn,constchar*query,
@@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn)
13151315
endtime=TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
13161316
CONNECTION_CLEANUP_TIMEOUT);
13171317

1318-
if (!pgfdw_cancel_query_begin(conn))
1318+
if (!pgfdw_cancel_query_begin(conn,endtime))
13191319
return false;
13201320
returnpgfdw_cancel_query_end(conn,endtime, false);
13211321
}
13221322

1323+
/*
1324+
* Submit a cancel request to the given connection, waiting only until
1325+
* the given time.
1326+
*
1327+
* We sleep interruptibly until we receive confirmation that the cancel
1328+
* request has been accepted, and if it is, return true; if the timeout
1329+
* lapses without that, or the request fails for whatever reason, return
1330+
* false.
1331+
*/
13231332
staticbool
1324-
pgfdw_cancel_query_begin(PGconn*conn)
1333+
pgfdw_cancel_query_begin(PGconn*conn,TimestampTzendtime)
13251334
{
1326-
PGcancel*cancel;
1327-
charerrbuf[256];
1335+
char*errormsg=libpqsrv_cancel(conn,endtime);
13281336

1329-
/*
1330-
* Issue cancel request. Unfortunately, there's no good way to limit the
1331-
* amount of time that we might block inside PQgetCancel().
1332-
*/
1333-
if ((cancel=PQgetCancel(conn)))
1334-
{
1335-
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
1336-
{
1337-
ereport(WARNING,
1338-
(errcode(ERRCODE_CONNECTION_FAILURE),
1339-
errmsg("could not send cancel request: %s",
1340-
errbuf)));
1341-
PQfreeCancel(cancel);
1342-
return false;
1343-
}
1344-
PQfreeCancel(cancel);
1345-
}
1337+
if (errormsg!=NULL)
1338+
ereport(WARNING,
1339+
errcode(ERRCODE_CONNECTION_FAILURE),
1340+
errmsg("could not send cancel request: %s",errormsg));
13461341

1347-
returntrue;
1342+
returnerrormsg==NULL;
13481343
}
13491344

13501345
staticbool
@@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
16851680
*/
16861681
if (PQtransactionStatus(entry->conn)==PQTRANS_ACTIVE)
16871682
{
1688-
if (!pgfdw_cancel_query_begin(entry->conn))
1683+
TimestampTzendtime;
1684+
1685+
endtime=TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1686+
CONNECTION_CLEANUP_TIMEOUT);
1687+
if (!pgfdw_cancel_query_begin(entry->conn,endtime))
16891688
return false;/* Unable to cancel running query */
16901689
*cancel_requested=lappend(*cancel_requested,entry);
16911690
}

‎contrib/postgres_fdw/expected/postgres_fdw.out

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
27392739
(10 rows)
27402740

27412741
ALTER VIEW v4 OWNER TO regress_view_owner;
2742+
-- Make sure this big CROSS JOIN query is pushed down
2743+
EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
2744+
QUERY PLAN
2745+
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
2746+
Foreign Scan
2747+
Output: (count(*))
2748+
Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
2749+
Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
2750+
(4 rows)
2751+
2752+
-- Make sure query cancellation works
2753+
SET statement_timeout = '10ms';
2754+
select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
2755+
ERROR: canceling statement due to statement timeout
2756+
RESET statement_timeout;
27422757
-- ====================================================================
27432758
-- Check that userid to use when querying the remote table is correctly
27442759
-- propagated into foreign rels present in subqueries under an UNION ALL

‎contrib/postgres_fdw/sql/postgres_fdw.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
737737
SELECTt1.c1,t2.c2FROM v4 t1LEFT JOIN ft5 t2ON (t1.c1=t2.c1)ORDER BYt1.c1,t2.c1 OFFSET10LIMIT10;
738738
ALTERVIEW v4 OWNER TO regress_view_owner;
739739

740+
-- Make sure this big CROSS JOIN query is pushed down
741+
EXPLAIN (VERBOSE, COSTS OFF)SELECTcount(*)FROM ft1CROSS JOIN ft2CROSS JOIN ft4CROSS JOIN ft5;
742+
-- Make sure query cancellation works
743+
SET statement_timeout='10ms';
744+
selectcount(*)from ft1CROSS JOIN ft2CROSS JOIN ft4CROSS JOIN ft5;-- this takes very long
745+
RESET statement_timeout;
746+
740747
-- ====================================================================
741748
-- Check that userid to use when querying the remote table is correctly
742749
-- propagated into foreign rels present in subqueries under an UNION ALL

‎src/include/libpq/libpq-be-fe-helpers.h

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
#include"miscadmin.h"
4545
#include"storage/fd.h"
4646
#include"storage/latch.h"
47+
#include"utils/timestamp.h"
48+
#include"utils/wait_event.h"
4749

4850

4951
staticinlinevoidlibpqsrv_connect_prepare(void);
@@ -365,4 +367,91 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
365367
returnPQgetResult(conn);
366368
}
367369

370+
/*
371+
* Submit a cancel request to the given connection, waiting only until
372+
* the given time.
373+
*
374+
* We sleep interruptibly until we receive confirmation that the cancel
375+
* request has been accepted, and if it is, return NULL; if the cancel
376+
* request fails, return an error message string (which is not to be
377+
* freed).
378+
*
379+
* For other problems (to wit: OOM when strdup'ing an error message from
380+
* libpq), this function can ereport(ERROR).
381+
*
382+
* Note: this function leaks a string's worth of memory when reporting
383+
* libpq errors. Make sure to call it in a transient memory context.
384+
*/
385+
staticinlinechar*
386+
libpqsrv_cancel(PGconn*conn,TimestampTzendtime)
387+
{
388+
PGcancelConn*cancel_conn;
389+
char*error=NULL;
390+
391+
cancel_conn=PQcancelCreate(conn);
392+
if (cancel_conn==NULL)
393+
return_("out of memory");
394+
395+
/* In what follows, do not leak any PGcancelConn on any errors. */
396+
397+
PG_TRY();
398+
{
399+
if (!PQcancelStart(cancel_conn))
400+
{
401+
error=pchomp(PQcancelErrorMessage(cancel_conn));
402+
gotoexit;
403+
}
404+
405+
for (;;)
406+
{
407+
PostgresPollingStatusTypepollres;
408+
TimestampTznow;
409+
longcur_timeout;
410+
intwaitEvents=WL_LATCH_SET |WL_TIMEOUT |WL_EXIT_ON_PM_DEATH;
411+
412+
pollres=PQcancelPoll(cancel_conn);
413+
if (pollres==PGRES_POLLING_OK)
414+
break;/* success! */
415+
416+
/* If timeout has expired, give up, else get sleep time. */
417+
now=GetCurrentTimestamp();
418+
cur_timeout=TimestampDifferenceMilliseconds(now,endtime);
419+
if (cur_timeout <=0)
420+
{
421+
error=_("cancel request timed out");
422+
break;
423+
}
424+
425+
switch (pollres)
426+
{
427+
casePGRES_POLLING_READING:
428+
waitEvents |=WL_SOCKET_READABLE;
429+
break;
430+
casePGRES_POLLING_WRITING:
431+
waitEvents |=WL_SOCKET_WRITEABLE;
432+
break;
433+
default:
434+
error=pchomp(PQcancelErrorMessage(cancel_conn));
435+
gotoexit;
436+
}
437+
438+
/* Sleep until there's something to do */
439+
WaitLatchOrSocket(MyLatch,waitEvents,PQcancelSocket(cancel_conn),
440+
cur_timeout,PG_WAIT_CLIENT);
441+
442+
ResetLatch(MyLatch);
443+
444+
CHECK_FOR_INTERRUPTS();
445+
}
446+
exit:;
447+
}
448+
PG_FINALLY();
449+
{
450+
PQcancelFinish(cancel_conn);
451+
}
452+
PG_END_TRY();
453+
454+
returnerror;
455+
}
456+
368457
#endif/* LIBPQ_BE_FE_HELPERS_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp