Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf039eaa

Browse files
committed
Allow queries submitted by postgres_fdw to be canceled.
This fixes a problem which is not new, but with the advent of directforeign table modification in0bf3ae8,it's somewhat more likely to be annoying than previously. So,arrange for a local query cancelation to propagate to the remote side.Michael Paquier, reviewed by Etsuro Fujita. Original report byThom Brown.
1 parent11e178d commitf039eaa

File tree

3 files changed

+180
-44
lines changed

3 files changed

+180
-44
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include"access/xact.h"
1818
#include"mb/pg_wchar.h"
1919
#include"miscadmin.h"
20+
#include"storage/latch.h"
2021
#include"utils/hsearch.h"
2122
#include"utils/memutils.h"
2223

@@ -447,6 +448,78 @@ GetPrepStmtNumber(PGconn *conn)
447448
return++prep_stmt_number;
448449
}
449450

451+
/*
452+
* Submit a query and wait for the result.
453+
*
454+
* This function is interruptible by signals.
455+
*
456+
* Caller is responsible for the error handling on the result.
457+
*/
458+
PGresult*
459+
pgfdw_exec_query(PGconn*conn,constchar*query)
460+
{
461+
/*
462+
* Submit a query. Since we don't use non-blocking mode, this also can
463+
* block. But its risk is relatively small, so we ignore that for now.
464+
*/
465+
if (!PQsendQuery(conn,query))
466+
pgfdw_report_error(ERROR,NULL,conn, false,query);
467+
468+
/* Wait for the result. */
469+
returnpgfdw_get_result(conn,query);
470+
}
471+
472+
/*
473+
* Wait for the result from a prior asynchronous execution function call.
474+
*
475+
* This function offers quick responsiveness by checking for any interruptions.
476+
*
477+
* This function emulates the PQexec()'s behavior of returning the last result
478+
* when there are many.
479+
*
480+
* Caller is responsible for the error handling on the result.
481+
*/
482+
PGresult*
483+
pgfdw_get_result(PGconn*conn,constchar*query)
484+
{
485+
PGresult*last_res=NULL;
486+
487+
for (;;)
488+
{
489+
PGresult*res;
490+
491+
while (PQisBusy(conn))
492+
{
493+
intwc;
494+
495+
/* Sleep until there's something to do */
496+
wc=WaitLatchOrSocket(MyLatch,
497+
WL_LATCH_SET |WL_SOCKET_READABLE,
498+
PQsocket(conn),
499+
-1L);
500+
ResetLatch(MyLatch);
501+
502+
CHECK_FOR_INTERRUPTS();
503+
504+
/* Data available in socket */
505+
if (wc&WL_SOCKET_READABLE)
506+
{
507+
if (!PQconsumeInput(conn))
508+
pgfdw_report_error(ERROR,NULL,conn, false,query);
509+
}
510+
}
511+
512+
res=PQgetResult(conn);
513+
if (res==NULL)
514+
break;/* query is complete */
515+
516+
PQclear(last_res);
517+
last_res=res;
518+
}
519+
520+
returnlast_res;
521+
}
522+
450523
/*
451524
* Report an error we got from the remote server.
452525
*
@@ -598,6 +671,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
598671
caseXACT_EVENT_ABORT:
599672
/* Assume we might have lost track of prepared statements */
600673
entry->have_error= true;
674+
675+
/*
676+
* If a command has been submitted to the remote server by
677+
* using an asynchronous execution function, the command
678+
* might not have yet completed. Check to see if a command
679+
* is still being processed by the remote server, and if so,
680+
* request cancellation of the command; if not, abort
681+
* gracefully.
682+
*/
683+
if (PQtransactionStatus(entry->conn)==PQTRANS_ACTIVE)
684+
{
685+
PGcancel*cancel;
686+
charerrbuf[256];
687+
688+
if ((cancel=PQgetCancel(entry->conn)))
689+
{
690+
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
691+
ereport(WARNING,
692+
(errcode(ERRCODE_CONNECTION_FAILURE),
693+
errmsg("could not send cancel request: %s",
694+
errbuf)));
695+
PQfreeCancel(cancel);
696+
}
697+
break;
698+
}
699+
601700
/* If we're aborting, abort all remote transactions too */
602701
res=PQexec(entry->conn,"ABORT TRANSACTION");
603702
/* Note: can't throw ERROR, it would be infinite loop */

‎contrib/postgres_fdw/postgres_fdw.c

Lines changed: 79 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
14211421
* We don't use a PG_TRY block here, so be careful not to throw error
14221422
* without releasing the PGresult.
14231423
*/
1424-
res=PQexec(fsstate->conn,sql);
1424+
res=pgfdw_exec_query(fsstate->conn,sql);
14251425
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
14261426
pgfdw_report_error(ERROR,res,fsstate->conn, true,sql);
14271427
PQclear(res);
@@ -1749,18 +1749,24 @@ postgresExecForeignInsert(EState *estate,
17491749
p_values=convert_prep_stmt_params(fmstate,NULL,slot);
17501750

17511751
/*
1752-
* Execute the prepared statement, and check for success.
1752+
* Execute the prepared statement.
1753+
*/
1754+
if (!PQsendQueryPrepared(fmstate->conn,
1755+
fmstate->p_name,
1756+
fmstate->p_nums,
1757+
p_values,
1758+
NULL,
1759+
NULL,
1760+
0))
1761+
pgfdw_report_error(ERROR,NULL,fmstate->conn, false,fmstate->query);
1762+
1763+
/*
1764+
* Get the result, and check for success.
17531765
*
17541766
* We don't use a PG_TRY block here, so be careful not to throw error
17551767
* without releasing the PGresult.
17561768
*/
1757-
res=PQexecPrepared(fmstate->conn,
1758-
fmstate->p_name,
1759-
fmstate->p_nums,
1760-
p_values,
1761-
NULL,
1762-
NULL,
1763-
0);
1769+
res=pgfdw_get_result(fmstate->conn,fmstate->query);
17641770
if (PQresultStatus(res)!=
17651771
(fmstate->has_returning ?PGRES_TUPLES_OK :PGRES_COMMAND_OK))
17661772
pgfdw_report_error(ERROR,res,fmstate->conn, true,fmstate->query);
@@ -1819,18 +1825,24 @@ postgresExecForeignUpdate(EState *estate,
18191825
slot);
18201826

18211827
/*
1822-
* Execute the prepared statement, and check for success.
1828+
* Execute the prepared statement.
1829+
*/
1830+
if (!PQsendQueryPrepared(fmstate->conn,
1831+
fmstate->p_name,
1832+
fmstate->p_nums,
1833+
p_values,
1834+
NULL,
1835+
NULL,
1836+
0))
1837+
pgfdw_report_error(ERROR,NULL,fmstate->conn, false,fmstate->query);
1838+
1839+
/*
1840+
* Get the result, and check for success.
18231841
*
18241842
* We don't use a PG_TRY block here, so be careful not to throw error
18251843
* without releasing the PGresult.
18261844
*/
1827-
res=PQexecPrepared(fmstate->conn,
1828-
fmstate->p_name,
1829-
fmstate->p_nums,
1830-
p_values,
1831-
NULL,
1832-
NULL,
1833-
0);
1845+
res=pgfdw_get_result(fmstate->conn,fmstate->query);
18341846
if (PQresultStatus(res)!=
18351847
(fmstate->has_returning ?PGRES_TUPLES_OK :PGRES_COMMAND_OK))
18361848
pgfdw_report_error(ERROR,res,fmstate->conn, true,fmstate->query);
@@ -1889,18 +1901,24 @@ postgresExecForeignDelete(EState *estate,
18891901
NULL);
18901902

18911903
/*
1892-
* Execute the prepared statement, and check for success.
1904+
* Execute the prepared statement.
1905+
*/
1906+
if (!PQsendQueryPrepared(fmstate->conn,
1907+
fmstate->p_name,
1908+
fmstate->p_nums,
1909+
p_values,
1910+
NULL,
1911+
NULL,
1912+
0))
1913+
pgfdw_report_error(ERROR,NULL,fmstate->conn, false,fmstate->query);
1914+
1915+
/*
1916+
* Get the result, and check for success.
18931917
*
18941918
* We don't use a PG_TRY block here, so be careful not to throw error
18951919
* without releasing the PGresult.
18961920
*/
1897-
res=PQexecPrepared(fmstate->conn,
1898-
fmstate->p_name,
1899-
fmstate->p_nums,
1900-
p_values,
1901-
NULL,
1902-
NULL,
1903-
0);
1921+
res=pgfdw_get_result(fmstate->conn,fmstate->query);
19041922
if (PQresultStatus(res)!=
19051923
(fmstate->has_returning ?PGRES_TUPLES_OK :PGRES_COMMAND_OK))
19061924
pgfdw_report_error(ERROR,res,fmstate->conn, true,fmstate->query);
@@ -1950,7 +1968,7 @@ postgresEndForeignModify(EState *estate,
19501968
* We don't use a PG_TRY block here, so be careful not to throw error
19511969
* without releasing the PGresult.
19521970
*/
1953-
res=PQexec(fmstate->conn,sql);
1971+
res=pgfdw_exec_query(fmstate->conn,sql);
19541972
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
19551973
pgfdw_report_error(ERROR,res,fmstate->conn, true,sql);
19561974
PQclear(res);
@@ -2712,7 +2730,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
27122730
/*
27132731
* Execute EXPLAIN remotely.
27142732
*/
2715-
res=PQexec(conn,sql);
2733+
res=pgfdw_exec_query(conn,sql);
27162734
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
27172735
pgfdw_report_error(ERROR,res,conn, false,sql);
27182736

@@ -2817,12 +2835,18 @@ create_cursor(ForeignScanState *node)
28172835
* parameter (see deparse.c), the "inference" is trivial and will produce
28182836
* the desired result. This allows us to avoid assuming that the remote
28192837
* server has the same OIDs we do for the parameters' types.
2838+
*/
2839+
if (!PQsendQueryParams(conn,buf.data,numParams,
2840+
NULL,values,NULL,NULL,0))
2841+
pgfdw_report_error(ERROR,NULL,conn, false,buf.data);
2842+
2843+
/*
2844+
* Get the result, and check for success.
28202845
*
28212846
* We don't use a PG_TRY block here, so be careful not to throw error
28222847
* without releasing the PGresult.
28232848
*/
2824-
res=PQexecParams(conn,buf.data,numParams,NULL,values,
2825-
NULL,NULL,0);
2849+
res=pgfdw_get_result(conn,buf.data);
28262850
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
28272851
pgfdw_report_error(ERROR,res,conn, true,fsstate->query);
28282852
PQclear(res);
@@ -2868,7 +2892,7 @@ fetch_more_data(ForeignScanState *node)
28682892
snprintf(sql,sizeof(sql),"FETCH %d FROM c%u",
28692893
fsstate->fetch_size,fsstate->cursor_number);
28702894

2871-
res=PQexec(conn,sql);
2895+
res=pgfdw_exec_query(conn,sql);
28722896
/* On error, report the original query, not the FETCH. */
28732897
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
28742898
pgfdw_report_error(ERROR,res,conn, false,fsstate->query);
@@ -2978,7 +3002,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
29783002
* We don't use a PG_TRY block here, so be careful not to throw error
29793003
* without releasing the PGresult.
29803004
*/
2981-
res=PQexec(conn,sql);
3005+
res=pgfdw_exec_query(conn,sql);
29823006
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
29833007
pgfdw_report_error(ERROR,res,conn, true,sql);
29843008
PQclear(res);
@@ -3006,16 +3030,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
30063030
* with the remote server using different type OIDs than we do. All of
30073031
* the prepared statements we use in this module are simple enough that
30083032
* the remote server will make the right choices.
3033+
*/
3034+
if (!PQsendPrepare(fmstate->conn,
3035+
p_name,
3036+
fmstate->query,
3037+
0,
3038+
NULL))
3039+
pgfdw_report_error(ERROR,NULL,fmstate->conn, false,fmstate->query);
3040+
3041+
/*
3042+
* Get the result, and check for success.
30093043
*
30103044
* We don't use a PG_TRY block here, so be careful not to throw error
30113045
* without releasing the PGresult.
30123046
*/
3013-
res=PQprepare(fmstate->conn,
3014-
p_name,
3015-
fmstate->query,
3016-
0,
3017-
NULL);
3018-
3047+
res=pgfdw_get_result(fmstate->conn,fmstate->query);
30193048
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
30203049
pgfdw_report_error(ERROR,res,fmstate->conn, true,fmstate->query);
30213050
PQclear(res);
@@ -3147,12 +3176,18 @@ execute_dml_stmt(ForeignScanState *node)
31473176
* parameter (see deparse.c), the "inference" is trivial and will produce
31483177
* the desired result. This allows us to avoid assuming that the remote
31493178
* server has the same OIDs we do for the parameters' types.
3179+
*/
3180+
if (!PQsendQueryParams(dmstate->conn,dmstate->query,numParams,
3181+
NULL,values,NULL,NULL,0))
3182+
pgfdw_report_error(ERROR,NULL,dmstate->conn, false,dmstate->query);
3183+
3184+
/*
3185+
* Get the result, and check for success.
31503186
*
31513187
* We don't use a PG_TRY block here, so be careful not to throw error
31523188
* without releasing the PGresult.
31533189
*/
3154-
dmstate->result=PQexecParams(dmstate->conn,dmstate->query,
3155-
numParams,NULL,values,NULL,NULL,0);
3190+
dmstate->result=pgfdw_get_result(dmstate->conn,dmstate->query);
31563191
if (PQresultStatus(dmstate->result)!=
31573192
(dmstate->has_returning ?PGRES_TUPLES_OK :PGRES_COMMAND_OK))
31583193
pgfdw_report_error(ERROR,dmstate->result,dmstate->conn, true,
@@ -3355,7 +3390,7 @@ postgresAnalyzeForeignTable(Relation relation,
33553390
/* In what follows, do not risk leaking any PGresults. */
33563391
PG_TRY();
33573392
{
3358-
res=PQexec(conn,sql.data);
3393+
res=pgfdw_exec_query(conn,sql.data);
33593394
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
33603395
pgfdw_report_error(ERROR,res,conn, false,sql.data);
33613396

@@ -3449,7 +3484,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
34493484
/* In what follows, do not risk leaking any PGresults. */
34503485
PG_TRY();
34513486
{
3452-
res=PQexec(conn,sql.data);
3487+
res=pgfdw_exec_query(conn,sql.data);
34533488
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
34543489
pgfdw_report_error(ERROR,res,conn, false,sql.data);
34553490
PQclear(res);
@@ -3500,7 +3535,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
35003535
snprintf(fetch_sql,sizeof(fetch_sql),"FETCH %d FROM c%u",
35013536
fetch_size,cursor_number);
35023537

3503-
res=PQexec(conn,fetch_sql);
3538+
res=pgfdw_exec_query(conn,fetch_sql);
35043539
/* On error, report the original query, not the FETCH. */
35053540
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
35063541
pgfdw_report_error(ERROR,res,conn, false,sql.data);
@@ -3675,7 +3710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
36753710
appendStringInfoString(&buf,"SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
36763711
deparseStringLiteral(&buf,stmt->remote_schema);
36773712

3678-
res=PQexec(conn,buf.data);
3713+
res=pgfdw_exec_query(conn,buf.data);
36793714
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
36803715
pgfdw_report_error(ERROR,res,conn, false,buf.data);
36813716

@@ -3774,7 +3809,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
37743809
appendStringInfoString(&buf," ORDER BY c.relname, a.attnum");
37753810

37763811
/* Fetch the data */
3777-
res=PQexec(conn,buf.data);
3812+
res=pgfdw_exec_query(conn,buf.data);
37783813
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
37793814
pgfdw_report_error(ERROR,res,conn, false,buf.data);
37803815

‎contrib/postgres_fdw/postgres_fdw.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
103103
externvoidReleaseConnection(PGconn*conn);
104104
externunsignedintGetCursorNumber(PGconn*conn);
105105
externunsignedintGetPrepStmtNumber(PGconn*conn);
106+
externPGresult*pgfdw_get_result(PGconn*conn,constchar*query);
107+
externPGresult*pgfdw_exec_query(PGconn*conn,constchar*query);
106108
externvoidpgfdw_report_error(intelevel,PGresult*res,PGconn*conn,
107109
boolclear,constchar*sql);
108110

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp