Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd3c5f37

Browse files
committed
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation andallowing DROP DATABASE (of a database not involved in the query). Apartfrom explicit dblink_cancel_query() calls, dblink still doesn't cancelthe remote side. The replacement for the blocking calls consists ofnew, general-purpose query execution wrappers in the libpqsrv facility.Out-of-tree extensions should adopt these. Use them in postgres_fdw,replacing a local implementation from which the libpqsrv implementationderives. This is a bug fix for dblink. Code inspection identified thebug at least thirteen years ago, but user complaints have not appeared.Hence, no back-patch for now.Discussion:https://postgr.es/m/20231122012945.74@rfd.leadboat.com
1 parent0efc831 commitd3c5f37

File tree

8 files changed

+180
-95
lines changed

8 files changed

+180
-95
lines changed

‎contrib/dblink/dblink.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include"utils/memutils.h"
6262
#include"utils/rel.h"
6363
#include"utils/varlena.h"
64+
#include"utils/wait_event.h"
6465

6566
PG_MODULE_MAGIC;
6667

@@ -133,6 +134,7 @@ static HTAB *remoteConnHash = NULL;
133134
/* custom wait event values, retrieved from shared memory */
134135
staticuint32dblink_we_connect=0;
135136
staticuint32dblink_we_get_conn=0;
137+
staticuint32dblink_we_get_result=0;
136138

137139
/*
138140
*Following is list that holds multiple remote connections.
@@ -252,6 +254,9 @@ dblink_init(void)
252254
{
253255
if (!pconn)
254256
{
257+
if (dblink_we_get_result==0)
258+
dblink_we_get_result=WaitEventExtensionNew("DblinkGetResult");
259+
255260
pconn= (remoteConn*)MemoryContextAlloc(TopMemoryContext,sizeof(remoteConn));
256261
pconn->conn=NULL;
257262
pconn->openCursorCount=0;
@@ -442,7 +447,7 @@ dblink_open(PG_FUNCTION_ARGS)
442447
/* If we are not in a transaction, start one */
443448
if (PQtransactionStatus(conn)==PQTRANS_IDLE)
444449
{
445-
res=PQexec(conn,"BEGIN");
450+
res=libpqsrv_exec(conn,"BEGIN",dblink_we_get_result);
446451
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
447452
dblink_res_internalerror(conn,res,"begin error");
448453
PQclear(res);
@@ -461,7 +466,7 @@ dblink_open(PG_FUNCTION_ARGS)
461466
(rconn->openCursorCount)++;
462467

463468
appendStringInfo(&buf,"DECLARE %s CURSOR FOR %s",curname,sql);
464-
res=PQexec(conn,buf.data);
469+
res=libpqsrv_exec(conn,buf.data,dblink_we_get_result);
465470
if (!res||PQresultStatus(res)!=PGRES_COMMAND_OK)
466471
{
467472
dblink_res_error(conn,conname,res,fail,
@@ -530,7 +535,7 @@ dblink_close(PG_FUNCTION_ARGS)
530535
appendStringInfo(&buf,"CLOSE %s",curname);
531536

532537
/* close the cursor */
533-
res=PQexec(conn,buf.data);
538+
res=libpqsrv_exec(conn,buf.data,dblink_we_get_result);
534539
if (!res||PQresultStatus(res)!=PGRES_COMMAND_OK)
535540
{
536541
dblink_res_error(conn,conname,res,fail,
@@ -550,7 +555,7 @@ dblink_close(PG_FUNCTION_ARGS)
550555
{
551556
rconn->newXactForCursor= false;
552557

553-
res=PQexec(conn,"COMMIT");
558+
res=libpqsrv_exec(conn,"COMMIT",dblink_we_get_result);
554559
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
555560
dblink_res_internalerror(conn,res,"commit error");
556561
PQclear(res);
@@ -632,7 +637,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
632637
* PGresult will be long-lived even though we are still in a short-lived
633638
* memory context.
634639
*/
635-
res=PQexec(conn,buf.data);
640+
res=libpqsrv_exec(conn,buf.data,dblink_we_get_result);
636641
if (!res||
637642
(PQresultStatus(res)!=PGRES_COMMAND_OK&&
638643
PQresultStatus(res)!=PGRES_TUPLES_OK))
@@ -780,7 +785,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
780785
else
781786
{
782787
/* async result retrieval, do it the old way */
783-
PGresult*res=PQgetResult(conn);
788+
PGresult*res=libpqsrv_get_result(conn,dblink_we_get_result);
784789

785790
/* NULL means we're all done with the async results */
786791
if (res)
@@ -1088,7 +1093,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
10881093
PQclear(sinfo.last_res);
10891094
PQclear(sinfo.cur_res);
10901095
/* and clear out any pending data in libpq */
1091-
while ((res=PQgetResult(conn))!=NULL)
1096+
while ((res=libpqsrv_get_result(conn,dblink_we_get_result))!=
1097+
NULL)
10921098
PQclear(res);
10931099
PG_RE_THROW();
10941100
}
@@ -1115,7 +1121,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
11151121
{
11161122
CHECK_FOR_INTERRUPTS();
11171123

1118-
sinfo->cur_res=PQgetResult(conn);
1124+
sinfo->cur_res=libpqsrv_get_result(conn,dblink_we_get_result);
11191125
if (!sinfo->cur_res)
11201126
break;
11211127

@@ -1443,7 +1449,7 @@ dblink_exec(PG_FUNCTION_ARGS)
14431449
if (!conn)
14441450
dblink_conn_not_avail(conname);
14451451

1446-
res=PQexec(conn,sql);
1452+
res=libpqsrv_exec(conn,sql,dblink_we_get_result);
14471453
if (!res||
14481454
(PQresultStatus(res)!=PGRES_COMMAND_OK&&
14491455
PQresultStatus(res)!=PGRES_TUPLES_OK))
@@ -2739,8 +2745,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
27392745

27402746
/*
27412747
* If we don't get a message from the PGresult, try the PGconn. This is
2742-
* needed because for connection-level failures,PQexec may just return
2743-
* NULL, not a PGresult at all.
2748+
* needed because for connection-level failures,PQgetResult may just
2749+
*returnNULL, not a PGresult at all.
27442750
*/
27452751
if (message_primary==NULL)
27462752
message_primary=pchomp(PQerrorMessage(conn));

‎contrib/postgres_fdw/connection.c

Lines changed: 17 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
187187
{
188188
HASHCTLctl;
189189

190+
if (pgfdw_we_get_result==0)
191+
pgfdw_we_get_result=
192+
WaitEventExtensionNew("PostgresFdwGetResult");
193+
190194
ctl.keysize=sizeof(ConnCacheKey);
191195
ctl.entrysize=sizeof(ConnCacheEntry);
192196
ConnectionHash=hash_create("postgres_fdw connections",8,
@@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
716720
*/
717721
if (consume_input&& !PQconsumeInput(conn))
718722
pgfdw_report_error(ERROR,NULL,conn, false,sql);
719-
res=pgfdw_get_result(conn,sql);
723+
res=pgfdw_get_result(conn);
720724
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
721725
pgfdw_report_error(ERROR,res,conn, true,sql);
722726
PQclear(res);
@@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn)
819823
/*
820824
* Submit a query and wait for the result.
821825
*
822-
* This function is interruptible by signals.
826+
* Since we don't use non-blocking mode, this can't process interrupts while
827+
* pushing the query text to the server. That risk is relatively small, so we
828+
* ignore that for now.
823829
*
824830
* Caller is responsible for the error handling on the result.
825831
*/
@@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
830836
if (state&&state->pendingAreq)
831837
process_pending_request(state->pendingAreq);
832838

833-
/*
834-
* Submit a query. Since we don't use non-blocking mode, this also can
835-
* block. But its risk is relatively small, so we ignore that for now.
836-
*/
837839
if (!PQsendQuery(conn,query))
838-
pgfdw_report_error(ERROR,NULL,conn, false,query);
839-
840-
/* Wait for the result. */
841-
returnpgfdw_get_result(conn,query);
840+
returnNULL;
841+
returnpgfdw_get_result(conn);
842842
}
843843

844844
/*
845-
* Wait for the result from a prior asynchronous execution function call.
846-
*
847-
* This function offers quick responsiveness by checking for any interruptions.
848-
*
849-
* This function emulates PQexec()'s behavior of returning the last result
850-
* when there are many.
845+
* Wrap libpqsrv_get_result_last(), adding wait event.
851846
*
852847
* Caller is responsible for the error handling on the result.
853848
*/
854849
PGresult*
855-
pgfdw_get_result(PGconn*conn,constchar*query)
850+
pgfdw_get_result(PGconn*conn)
856851
{
857-
PGresult*volatilelast_res=NULL;
858-
859-
/* In what follows, do not leak any PGresults on an error. */
860-
PG_TRY();
861-
{
862-
for (;;)
863-
{
864-
PGresult*res;
865-
866-
while (PQisBusy(conn))
867-
{
868-
intwc;
869-
870-
/* first time, allocate or get the custom wait event */
871-
if (pgfdw_we_get_result==0)
872-
pgfdw_we_get_result=WaitEventExtensionNew("PostgresFdwGetResult");
873-
874-
/* Sleep until there's something to do */
875-
wc=WaitLatchOrSocket(MyLatch,
876-
WL_LATCH_SET |WL_SOCKET_READABLE |
877-
WL_EXIT_ON_PM_DEATH,
878-
PQsocket(conn),
879-
-1L,pgfdw_we_get_result);
880-
ResetLatch(MyLatch);
881-
882-
CHECK_FOR_INTERRUPTS();
883-
884-
/* Data available in socket? */
885-
if (wc&WL_SOCKET_READABLE)
886-
{
887-
if (!PQconsumeInput(conn))
888-
pgfdw_report_error(ERROR,NULL,conn, false,query);
889-
}
890-
}
891-
892-
res=PQgetResult(conn);
893-
if (res==NULL)
894-
break;/* query is complete */
895-
896-
PQclear(last_res);
897-
last_res=res;
898-
}
899-
}
900-
PG_CATCH();
901-
{
902-
PQclear(last_res);
903-
PG_RE_THROW();
904-
}
905-
PG_END_TRY();
906-
907-
returnlast_res;
852+
returnlibpqsrv_get_result_last(conn,pgfdw_we_get_result);
908853
}
909854

910855
/*
@@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
945890

946891
/*
947892
* If we don't get a message from the PGresult, try the PGconn. This
948-
* is needed because for connection-level failures,PQexec may just
949-
* return NULL, not a PGresult at all.
893+
* is needed because for connection-level failures,PQgetResult may
894+
*justreturn NULL, not a PGresult at all.
950895
*/
951896
if (message_primary==NULL)
952897
message_primary=pchomp(PQerrorMessage(conn));
@@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
1046991
*/
1047992
if (entry->have_prep_stmt&&entry->have_error)
1048993
{
1049-
res=PQexec(entry->conn,"DEALLOCATE ALL");
994+
res=pgfdw_exec_query(entry->conn,"DEALLOCATE ALL",
995+
NULL);
1050996
PQclear(res);
1051997
}
1052998
entry->have_prep_stmt= false;

‎contrib/postgres_fdw/deparse.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3815,7 +3815,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first,
38153815
* Print the representation of a parameter to be sent to the remote side.
38163816
*
38173817
* Note: we always label the Param's type explicitly rather than relying on
3818-
* transmitting a numeric type OID inPQexecParams(). This allows us to
3818+
* transmitting a numeric type OID inPQsendQueryParams(). This allows us to
38193819
* avoid assuming that types have the same OIDs on the remote side as they
38203820
* do locally --- they need only have the same names.
38213821
*/

‎contrib/postgres_fdw/postgres_fdw.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3760,7 +3760,7 @@ create_cursor(ForeignScanState *node)
37603760
* We don't use a PG_TRY block here, so be careful not to throw error
37613761
* without releasing the PGresult.
37623762
*/
3763-
res=pgfdw_get_result(conn,buf.data);
3763+
res=pgfdw_get_result(conn);
37643764
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
37653765
pgfdw_report_error(ERROR,res,conn, true,fsstate->query);
37663766
PQclear(res);
@@ -3810,7 +3810,7 @@ fetch_more_data(ForeignScanState *node)
38103810
* The query was already sent by an earlier call to
38113811
* fetch_more_data_begin. So now we just fetch the result.
38123812
*/
3813-
res=pgfdw_get_result(conn,fsstate->query);
3813+
res=pgfdw_get_result(conn);
38143814
/* On error, report the original query, not the FETCH. */
38153815
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
38163816
pgfdw_report_error(ERROR,res,conn, false,fsstate->query);
@@ -4159,7 +4159,7 @@ execute_foreign_modify(EState *estate,
41594159
* We don't use a PG_TRY block here, so be careful not to throw error
41604160
* without releasing the PGresult.
41614161
*/
4162-
res=pgfdw_get_result(fmstate->conn,fmstate->query);
4162+
res=pgfdw_get_result(fmstate->conn);
41634163
if (PQresultStatus(res)!=
41644164
(fmstate->has_returning ?PGRES_TUPLES_OK :PGRES_COMMAND_OK))
41654165
pgfdw_report_error(ERROR,res,fmstate->conn, true,fmstate->query);
@@ -4229,7 +4229,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
42294229
* We don't use a PG_TRY block here, so be careful not to throw error
42304230
* without releasing the PGresult.
42314231
*/
4232-
res=pgfdw_get_result(fmstate->conn,fmstate->query);
4232+
res=pgfdw_get_result(fmstate->conn);
42334233
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
42344234
pgfdw_report_error(ERROR,res,fmstate->conn, true,fmstate->query);
42354235
PQclear(res);
@@ -4571,7 +4571,7 @@ execute_dml_stmt(ForeignScanState *node)
45714571
* We don't use a PG_TRY block here, so be careful not to throw error
45724572
* without releasing the PGresult.
45734573
*/
4574-
dmstate->result=pgfdw_get_result(dmstate->conn,dmstate->query);
4574+
dmstate->result=pgfdw_get_result(dmstate->conn);
45754575
if (PQresultStatus(dmstate->result)!=
45764576
(dmstate->has_returning ?PGRES_TUPLES_OK :PGRES_COMMAND_OK))
45774577
pgfdw_report_error(ERROR,dmstate->result,dmstate->conn, true,

‎contrib/postgres_fdw/postgres_fdw.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ extern void ReleaseConnection(PGconn *conn);
162162
externunsignedintGetCursorNumber(PGconn*conn);
163163
externunsignedintGetPrepStmtNumber(PGconn*conn);
164164
externvoiddo_sql_command(PGconn*conn,constchar*sql);
165-
externPGresult*pgfdw_get_result(PGconn*conn,constchar*query);
165+
externPGresult*pgfdw_get_result(PGconn*conn);
166166
externPGresult*pgfdw_exec_query(PGconn*conn,constchar*query,
167167
PgFdwConnState*state);
168168
externvoidpgfdw_report_error(intelevel,PGresult*res,PGconn*conn,

‎doc/src/sgml/dblink.sgml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@
3737
</para>
3838
</listitem>
3939
</varlistentry>
40+
41+
<varlistentry>
42+
<term><literal>DblinkGetResult</literal></term>
43+
<listitem>
44+
<para>
45+
Waiting to receive the results of a query from a remote server.
46+
</para>
47+
</listitem>
48+
</varlistentry>
4049
</variablelist>
4150

4251
<para>

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -653,12 +653,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
653653
* Send a query and wait for the results by using the asynchronous libpq
654654
* functions and socket readiness events.
655655
*
656-
* We must not use the regular blocking libpq functions like PQexec()
657-
* since they are uninterruptible by signals on some platforms, such as
658-
* Windows.
659-
*
660-
* The function is modeled on PQexec() in libpq, but only implements
661-
* those parts that are in use in the walreceiver api.
656+
* The function is modeled on libpqsrv_exec(), with the behavior difference
657+
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
658+
* skips try/catch, since all errors terminate the process.
662659
*
663660
* May return NULL, rather than an error result, on failure.
664661
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp