Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit41b9c84

Browse files
committed
Replace libpq's "row processor" API with a "single row" mode.
After taking awhile to digest the row-processor feature that was added tolibpq in commit92785da, we've concludedit is over-complicated and too hard to use. Leave the core infrastructurechanges in place (that is, there's still a row processor function insidelibpq), but remove the exposed API pieces, and instead provide a "singlerow" mode switch that causes PQgetResult to return one row at a time inseparate PGresult objects.This approach incurs more overhead than proper use of a row processorcallback would, since construction of a PGresult per row adds extra cycles.However, it is far easier to use and harder to break. The single-row modestill affords applications the primary benefit that the row processor APIwas meant to provide, namely not having to accumulate large result sets inmemory before processing them. Preliminary testing suggests that we canprobably buy back most of the extra cycles by micro-optimizing constructionof the extra results, but that task will be left for another day.Marko Kreen
1 parent7c0fecd commit41b9c84

File tree

10 files changed

+404
-655
lines changed

10 files changed

+404
-655
lines changed

‎contrib/dblink/dblink.c

Lines changed: 99 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ typedef struct storeInfo
7070
AttInMetadata*attinmeta;
7171
MemoryContexttmpcontext;
7272
char**cstrs;
73+
/* temp storage for results to avoid leaks on exception */
74+
PGresult*last_res;
75+
PGresult*cur_res;
7376
}storeInfo;
7477

7578
/*
@@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
8386
constchar*conname,
8487
constchar*sql,
8588
boolfail);
86-
staticintstoreHandler(PGresult*res,constPGdataValue*columns,
87-
constchar**errmsgp,void*param);
89+
staticPGresult*storeQueryResult(storeInfo*sinfo,PGconn*conn,constchar*sql);
90+
staticvoidstoreRow(storeInfo*sinfo,PGresult*res,boolfirst);
8891
staticremoteConn*getConnectionByName(constchar*name);
8992
staticHTAB*createConnHash(void);
9093
staticvoidcreateNewConnection(constchar*name,remoteConn*rconn);
@@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
630633
/* async query send */
631634
retval=PQsendQuery(conn,sql);
632635
if (retval!=1)
633-
elog(NOTICE,"%s",PQerrorMessage(conn));
636+
elog(NOTICE,"could not send query:%s",PQerrorMessage(conn));
634637

635638
PG_RETURN_INT32(retval);
636639
}
@@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
927930
/*
928931
* Execute the given SQL command and store its results into a tuplestore
929932
* to be returned as the result of the current function.
933+
*
930934
* This is equivalent to PQexec followed by materializeResult, but we make
931-
* use of libpq's "row processor" API to reduce per-row overhead.
935+
* use of libpq's single-row mode to avoid accumulating the whole result
936+
* inside libpq before it gets transferred to the tuplestore.
932937
*/
933938
staticvoid
934939
materializeQueryResult(FunctionCallInfofcinfo,
@@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo,
944949
/* prepTuplestoreResult must have been called previously */
945950
Assert(rsinfo->returnMode==SFRM_Materialize);
946951

952+
/* initialize storeInfo to empty */
953+
memset(&sinfo,0,sizeof(sinfo));
954+
sinfo.fcinfo=fcinfo;
955+
947956
PG_TRY();
948957
{
949-
/* initialize storeInfo to empty */
950-
memset(&sinfo,0,sizeof(sinfo));
951-
sinfo.fcinfo=fcinfo;
952-
953-
/* We'll collect tuples using storeHandler */
954-
PQsetRowProcessor(conn,storeHandler,&sinfo);
955-
956-
res=PQexec(conn,sql);
957-
958-
/* We don't keep the custom row processor installed permanently */
959-
PQsetRowProcessor(conn,NULL,NULL);
958+
/* execute query, collecting any tuples into the tuplestore */
959+
res=storeQueryResult(&sinfo,conn,sql);
960960

961961
if (!res||
962962
(PQresultStatus(res)!=PGRES_COMMAND_OK&&
@@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
975975
elseif (PQresultStatus(res)==PGRES_COMMAND_OK)
976976
{
977977
/*
978-
*storeHandler didn't get called, so we need to convert the
979-
*commandstatus string to a tuple manually
978+
*storeRow didn't get called, so we need to convert the command
979+
* status string to a tuple manually
980980
*/
981981
TupleDesctupdesc;
982982
AttInMetadata*attinmeta;
@@ -1008,49 +1008,103 @@ materializeQueryResult(FunctionCallInfo fcinfo,
10081008
tuplestore_puttuple(tupstore,tuple);
10091009

10101010
PQclear(res);
1011+
res=NULL;
10111012
}
10121013
else
10131014
{
10141015
Assert(PQresultStatus(res)==PGRES_TUPLES_OK);
1015-
/*storeHandler should have created a tuplestore */
1016+
/*storeRow should have created a tuplestore */
10161017
Assert(rsinfo->setResult!=NULL);
10171018

10181019
PQclear(res);
1020+
res=NULL;
10191021
}
1022+
PQclear(sinfo.last_res);
1023+
sinfo.last_res=NULL;
1024+
PQclear(sinfo.cur_res);
1025+
sinfo.cur_res=NULL;
10201026
}
10211027
PG_CATCH();
10221028
{
1023-
/* be sure to unset the custom row processor */
1024-
PQsetRowProcessor(conn,NULL,NULL);
10251029
/* be sure to release any libpq result we collected */
1026-
if (res)
1027-
PQclear(res);
1030+
PQclear(res);
1031+
PQclear(sinfo.last_res);
1032+
PQclear(sinfo.cur_res);
10281033
/* and clear out any pending data in libpq */
1029-
while ((res=PQskipResult(conn))!=NULL)
1034+
while ((res=PQgetResult(conn))!=NULL)
10301035
PQclear(res);
10311036
PG_RE_THROW();
10321037
}
10331038
PG_END_TRY();
10341039
}
10351040

10361041
/*
1037-
* Custom row processor for materializeQueryResult.
1038-
* Prototype of this function must match PQrowProcessor.
1042+
* Execute query, and send any result rows to sinfo->tuplestore.
10391043
*/
1040-
staticint
1041-
storeHandler(PGresult*res,constPGdataValue*columns,
1042-
constchar**errmsgp,void*param)
1044+
staticPGresult*
1045+
storeQueryResult(storeInfo*sinfo,PGconn*conn,constchar*sql)
1046+
{
1047+
boolfirst= true;
1048+
PGresult*res;
1049+
1050+
if (!PQsendQuery(conn,sql))
1051+
elog(ERROR,"could not send query: %s",PQerrorMessage(conn));
1052+
1053+
if (!PQsetSingleRowMode(conn))/* shouldn't fail */
1054+
elog(ERROR,"failed to set single-row mode for dblink query");
1055+
1056+
for (;;)
1057+
{
1058+
CHECK_FOR_INTERRUPTS();
1059+
1060+
sinfo->cur_res=PQgetResult(conn);
1061+
if (!sinfo->cur_res)
1062+
break;
1063+
1064+
if (PQresultStatus(sinfo->cur_res)==PGRES_SINGLE_TUPLE)
1065+
{
1066+
/* got one row from possibly-bigger resultset */
1067+
storeRow(sinfo,sinfo->cur_res,first);
1068+
1069+
PQclear(sinfo->cur_res);
1070+
sinfo->cur_res=NULL;
1071+
first= false;
1072+
}
1073+
else
1074+
{
1075+
/* if empty resultset, fill tuplestore header */
1076+
if (first&&PQresultStatus(sinfo->cur_res)==PGRES_TUPLES_OK)
1077+
storeRow(sinfo,sinfo->cur_res,first);
1078+
1079+
/* store completed result at last_res */
1080+
PQclear(sinfo->last_res);
1081+
sinfo->last_res=sinfo->cur_res;
1082+
sinfo->cur_res=NULL;
1083+
first= true;
1084+
}
1085+
}
1086+
1087+
/* return last_res */
1088+
res=sinfo->last_res;
1089+
sinfo->last_res=NULL;
1090+
returnres;
1091+
}
1092+
1093+
/*
1094+
* Send single row to sinfo->tuplestore.
1095+
*
1096+
* If "first" is true, create the tuplestore using PGresult's metadata
1097+
* (in this case the PGresult might contain either zero or one row).
1098+
*/
1099+
staticvoid
1100+
storeRow(storeInfo*sinfo,PGresult*res,boolfirst)
10431101
{
1044-
storeInfo*sinfo= (storeInfo*)param;
10451102
intnfields=PQnfields(res);
1046-
char**cstrs=sinfo->cstrs;
10471103
HeapTupletuple;
1048-
char*pbuf;
1049-
intpbuflen;
10501104
inti;
10511105
MemoryContextoldcontext;
10521106

1053-
if (columns==NULL)
1107+
if (first)
10541108
{
10551109
/* Prepare for new result set */
10561110
ReturnSetInfo*rsinfo= (ReturnSetInfo*)sinfo->fcinfo->resultinfo;
@@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns,
10981152
sinfo->attinmeta=TupleDescGetAttInMetadata(tupdesc);
10991153

11001154
/* Create a new, empty tuplestore */
1101-
oldcontext=MemoryContextSwitchTo(
1102-
rsinfo->econtext->ecxt_per_query_memory);
1155+
oldcontext=MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
11031156
sinfo->tuplestore=tuplestore_begin_heap(true, false,work_mem);
11041157
rsinfo->setResult=sinfo->tuplestore;
11051158
rsinfo->setDesc=tupdesc;
11061159
MemoryContextSwitchTo(oldcontext);
11071160

1161+
/* Done if empty resultset */
1162+
if (PQntuples(res)==0)
1163+
return;
1164+
11081165
/*
11091166
* Set up sufficiently-wide string pointers array; this won't change
11101167
* in size so it's easy to preallocate.
@@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11211178
ALLOCSET_DEFAULT_MINSIZE,
11221179
ALLOCSET_DEFAULT_INITSIZE,
11231180
ALLOCSET_DEFAULT_MAXSIZE);
1124-
1125-
return1;
11261181
}
11271182

1128-
CHECK_FOR_INTERRUPTS();
1183+
/* Should have a single-row result if we get here */
1184+
Assert(PQntuples(res)==1);
11291185

11301186
/*
11311187
* Do the following work in a temp context that we reset after each tuple.
@@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11351191
oldcontext=MemoryContextSwitchTo(sinfo->tmpcontext);
11361192

11371193
/*
1138-
* The strings passed to us are not null-terminated, but the datatype
1139-
* input functions we're about to call require null termination. Copy the
1140-
* strings and add null termination. As a micro-optimization, allocate
1141-
* all the strings with one palloc.
1194+
* Fill cstrs with null-terminated strings of column values.
11421195
*/
1143-
pbuflen=nfields;/* count the null terminators themselves */
11441196
for (i=0;i<nfields;i++)
11451197
{
1146-
intlen=columns[i].len;
1147-
1148-
if (len>0)
1149-
pbuflen+=len;
1150-
}
1151-
pbuf= (char*)palloc(pbuflen);
1152-
1153-
for (i=0;i<nfields;i++)
1154-
{
1155-
intlen=columns[i].len;
1156-
1157-
if (len<0)
1158-
cstrs[i]=NULL;
1198+
if (PQgetisnull(res,0,i))
1199+
sinfo->cstrs[i]=NULL;
11591200
else
1160-
{
1161-
cstrs[i]=pbuf;
1162-
memcpy(pbuf,columns[i].value,len);
1163-
pbuf+=len;
1164-
*pbuf++='\0';
1165-
}
1201+
sinfo->cstrs[i]=PQgetvalue(res,0,i);
11661202
}
11671203

11681204
/* Convert row to a tuple, and add it to the tuplestore */
1169-
tuple=BuildTupleFromCStrings(sinfo->attinmeta,cstrs);
1205+
tuple=BuildTupleFromCStrings(sinfo->attinmeta,sinfo->cstrs);
11701206

11711207
tuplestore_puttuple(sinfo->tuplestore,tuple);
11721208

11731209
/* Clean up */
11741210
MemoryContextSwitchTo(oldcontext);
11751211
MemoryContextReset(sinfo->tmpcontext);
1176-
1177-
return1;
11781212
}
11791213

11801214
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp