@@ -70,6 +70,9 @@ typedef struct storeInfo
7070AttInMetadata * attinmeta ;
7171MemoryContext tmpcontext ;
7272char * * cstrs ;
73+ /* temp storage for results to avoid leaks on exception */
74+ PGresult * last_res ;
75+ PGresult * cur_res ;
7376}storeInfo ;
7477
7578/*
@@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
8386const char * conname ,
8487const char * sql ,
8588bool fail );
86- static int storeHandler ( PGresult * res , const PGdataValue * columns ,
87- const char * * errmsgp , void * param );
89+ static PGresult * storeQueryResult ( storeInfo * sinfo , PGconn * conn , const char * sql );
90+ static void storeRow ( storeInfo * sinfo , PGresult * res , bool first );
8891static remoteConn * getConnectionByName (const char * name );
8992static HTAB * createConnHash (void );
9093static void createNewConnection (const char * name ,remoteConn * rconn );
@@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
630633/* async query send */
631634retval = PQsendQuery (conn ,sql );
632635if (retval != 1 )
633- elog (NOTICE ,"%s" ,PQerrorMessage (conn ));
636+ elog (NOTICE ,"could not send query: %s" ,PQerrorMessage (conn ));
634637
635638PG_RETURN_INT32 (retval );
636639}
@@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
927930/*
928931 * Execute the given SQL command and store its results into a tuplestore
929932 * to be returned as the result of the current function.
933+ *
930934 * This is equivalent to PQexec followed by materializeResult, but we make
931- * use of libpq's "row processor" API to reduce per-row overhead.
935+ * use of libpq's single-row mode to avoid accumulating the whole result
936+ * inside libpq before it gets transferred to the tuplestore.
932937 */
933938static void
934939materializeQueryResult (FunctionCallInfo fcinfo ,
@@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo,
944949/* prepTuplestoreResult must have been called previously */
945950Assert (rsinfo -> returnMode == SFRM_Materialize );
946951
952+ /* initialize storeInfo to empty */
953+ memset (& sinfo ,0 ,sizeof (sinfo ));
954+ sinfo .fcinfo = fcinfo ;
955+
947956PG_TRY ();
948957{
949- /* initialize storeInfo to empty */
950- memset (& sinfo ,0 ,sizeof (sinfo ));
951- sinfo .fcinfo = fcinfo ;
952-
953- /* We'll collect tuples using storeHandler */
954- PQsetRowProcessor (conn ,storeHandler ,& sinfo );
955-
956- res = PQexec (conn ,sql );
957-
958- /* We don't keep the custom row processor installed permanently */
959- PQsetRowProcessor (conn ,NULL ,NULL );
958+ /* execute query, collecting any tuples into the tuplestore */
959+ res = storeQueryResult (& sinfo ,conn ,sql );
960960
961961if (!res ||
962962(PQresultStatus (res )!= PGRES_COMMAND_OK &&
@@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
975975else if (PQresultStatus (res )== PGRES_COMMAND_OK )
976976{
977977/*
978- *storeHandler didn't get called, so we need to convert the
979- *command status string to a tuple manually
978+ *storeRow didn't get called, so we need to convert the command
979+ * status string to a tuple manually
980980 */
981981TupleDesc tupdesc ;
982982AttInMetadata * attinmeta ;
@@ -1008,49 +1008,103 @@ materializeQueryResult(FunctionCallInfo fcinfo,
10081008tuplestore_puttuple (tupstore ,tuple );
10091009
10101010PQclear (res );
1011+ res = NULL ;
10111012}
10121013else
10131014{
10141015Assert (PQresultStatus (res )== PGRES_TUPLES_OK );
1015- /*storeHandler should have created a tuplestore */
1016+ /*storeRow should have created a tuplestore */
10161017Assert (rsinfo -> setResult != NULL );
10171018
10181019PQclear (res );
1020+ res = NULL ;
10191021}
1022+ PQclear (sinfo .last_res );
1023+ sinfo .last_res = NULL ;
1024+ PQclear (sinfo .cur_res );
1025+ sinfo .cur_res = NULL ;
10201026}
10211027PG_CATCH ();
10221028{
1023- /* be sure to unset the custom row processor */
1024- PQsetRowProcessor (conn ,NULL ,NULL );
10251029/* be sure to release any libpq result we collected */
1026- if (res )
1027- PQclear (res );
1030+ PQclear (res );
1031+ PQclear (sinfo .last_res );
1032+ PQclear (sinfo .cur_res );
10281033/* and clear out any pending data in libpq */
1029- while ((res = PQskipResult (conn ))!= NULL )
1034+ while ((res = PQgetResult (conn ))!= NULL )
10301035PQclear (res );
10311036PG_RE_THROW ();
10321037}
10331038PG_END_TRY ();
10341039}
10351040
10361041/*
1037- * Custom row processor for materializeQueryResult.
1038- * Prototype of this function must match PQrowProcessor.
1042+ * Execute query, and send any result rows to sinfo->tuplestore.
10391043 */
1040- static int
1041- storeHandler (PGresult * res ,const PGdataValue * columns ,
1042- const char * * errmsgp ,void * param )
1044+ static PGresult *
1045+ storeQueryResult (storeInfo * sinfo ,PGconn * conn ,const char * sql )
1046+ {
1047+ bool first = true;
1048+ PGresult * res ;
1049+
1050+ if (!PQsendQuery (conn ,sql ))
1051+ elog (ERROR ,"could not send query: %s" ,PQerrorMessage (conn ));
1052+
1053+ if (!PQsetSingleRowMode (conn ))/* shouldn't fail */
1054+ elog (ERROR ,"failed to set single-row mode for dblink query" );
1055+
1056+ for (;;)
1057+ {
1058+ CHECK_FOR_INTERRUPTS ();
1059+
1060+ sinfo -> cur_res = PQgetResult (conn );
1061+ if (!sinfo -> cur_res )
1062+ break ;
1063+
1064+ if (PQresultStatus (sinfo -> cur_res )== PGRES_SINGLE_TUPLE )
1065+ {
1066+ /* got one row from possibly-bigger resultset */
1067+ storeRow (sinfo ,sinfo -> cur_res ,first );
1068+
1069+ PQclear (sinfo -> cur_res );
1070+ sinfo -> cur_res = NULL ;
1071+ first = false;
1072+ }
1073+ else
1074+ {
1075+ /* if empty resultset, fill tuplestore header */
1076+ if (first && PQresultStatus (sinfo -> cur_res )== PGRES_TUPLES_OK )
1077+ storeRow (sinfo ,sinfo -> cur_res ,first );
1078+
1079+ /* store completed result at last_res */
1080+ PQclear (sinfo -> last_res );
1081+ sinfo -> last_res = sinfo -> cur_res ;
1082+ sinfo -> cur_res = NULL ;
1083+ first = true;
1084+ }
1085+ }
1086+
1087+ /* return last_res */
1088+ res = sinfo -> last_res ;
1089+ sinfo -> last_res = NULL ;
1090+ return res ;
1091+ }
1092+
1093+ /*
1094+ * Send single row to sinfo->tuplestore.
1095+ *
1096+ * If "first" is true, create the tuplestore using PGresult's metadata
1097+ * (in this case the PGresult might contain either zero or one row).
1098+ */
1099+ static void
1100+ storeRow (storeInfo * sinfo ,PGresult * res ,bool first )
10431101{
1044- storeInfo * sinfo = (storeInfo * )param ;
10451102int nfields = PQnfields (res );
1046- char * * cstrs = sinfo -> cstrs ;
10471103HeapTuple tuple ;
1048- char * pbuf ;
1049- int pbuflen ;
10501104int i ;
10511105MemoryContext oldcontext ;
10521106
1053- if (columns == NULL )
1107+ if (first )
10541108{
10551109/* Prepare for new result set */
10561110ReturnSetInfo * rsinfo = (ReturnSetInfo * )sinfo -> fcinfo -> resultinfo ;
@@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns,
10981152sinfo -> attinmeta = TupleDescGetAttInMetadata (tupdesc );
10991153
11001154/* Create a new, empty tuplestore */
1101- oldcontext = MemoryContextSwitchTo (
1102- rsinfo -> econtext -> ecxt_per_query_memory );
1155+ oldcontext = MemoryContextSwitchTo (rsinfo -> econtext -> ecxt_per_query_memory );
11031156sinfo -> tuplestore = tuplestore_begin_heap (true, false,work_mem );
11041157rsinfo -> setResult = sinfo -> tuplestore ;
11051158rsinfo -> setDesc = tupdesc ;
11061159MemoryContextSwitchTo (oldcontext );
11071160
1161+ /* Done if empty resultset */
1162+ if (PQntuples (res )== 0 )
1163+ return ;
1164+
11081165/*
11091166 * Set up sufficiently-wide string pointers array; this won't change
11101167 * in size so it's easy to preallocate.
@@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11211178ALLOCSET_DEFAULT_MINSIZE ,
11221179ALLOCSET_DEFAULT_INITSIZE ,
11231180ALLOCSET_DEFAULT_MAXSIZE );
1124-
1125- return 1 ;
11261181}
11271182
1128- CHECK_FOR_INTERRUPTS ();
1183+ /* Should have a single-row result if we get here */
1184+ Assert (PQntuples (res )== 1 );
11291185
11301186/*
11311187 * Do the following work in a temp context that we reset after each tuple.
@@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11351191oldcontext = MemoryContextSwitchTo (sinfo -> tmpcontext );
11361192
11371193/*
1138- * The strings passed to us are not null-terminated, but the datatype
1139- * input functions we're about to call require null termination. Copy the
1140- * strings and add null termination. As a micro-optimization, allocate
1141- * all the strings with one palloc.
1194+ * Fill cstrs with null-terminated strings of column values.
11421195 */
1143- pbuflen = nfields ;/* count the null terminators themselves */
11441196for (i = 0 ;i < nfields ;i ++ )
11451197{
1146- int len = columns [i ].len ;
1147-
1148- if (len > 0 )
1149- pbuflen += len ;
1150- }
1151- pbuf = (char * )palloc (pbuflen );
1152-
1153- for (i = 0 ;i < nfields ;i ++ )
1154- {
1155- int len = columns [i ].len ;
1156-
1157- if (len < 0 )
1158- cstrs [i ]= NULL ;
1198+ if (PQgetisnull (res ,0 ,i ))
1199+ sinfo -> cstrs [i ]= NULL ;
11591200else
1160- {
1161- cstrs [i ]= pbuf ;
1162- memcpy (pbuf ,columns [i ].value ,len );
1163- pbuf += len ;
1164- * pbuf ++ = '\0' ;
1165- }
1201+ sinfo -> cstrs [i ]= PQgetvalue (res ,0 ,i );
11661202}
11671203
11681204/* Convert row to a tuple, and add it to the tuplestore */
1169- tuple = BuildTupleFromCStrings (sinfo -> attinmeta ,cstrs );
1205+ tuple = BuildTupleFromCStrings (sinfo -> attinmeta ,sinfo -> cstrs );
11701206
11711207tuplestore_puttuple (sinfo -> tuplestore ,tuple );
11721208
11731209/* Clean up */
11741210MemoryContextSwitchTo (oldcontext );
11751211MemoryContextReset (sinfo -> tmpcontext );
1176-
1177- return 1 ;
11781212}
11791213
11801214/*