Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitab09679

Browse files
committed
libpq: Fix sending queries in pipeline aborted state
When sending queries in pipeline mode, we were careless about leavingthe connection in the right state so that PQgetResult would behavecorrectly; trying to read further results after sending a query afterhaving read a result with an error would sometimes hang. Fix byensuring internal libpq state is changed properly. All the statechanges were being done by the callers of pqAppendCmdQueueEntry(); itwould have become too repetitious to have this logic in each of them, soinstead put it all in that function and relieve callers of theresponsibility.Add a test to verify this case. Without the code fix, this new testhangs sometimes.Also, document that PQisBusy() would return false when no queries arepending result. This is not intuitively obvious, and NULL would beobtained by calling PQgetResult() at that point, which is confusing.Wording by Boris Kolpackov.In passing, fix bogus use of "false" to mean "0", per Ranier Vilela.Backpatch to 14.Author: Álvaro Herrera <alvherre@alvh.no-ip.org>Reported-by: Boris Kolpackov <boris@codesynthesis.com>Discussion:https://postgr.es/m/boris.20210624103805@codesynthesis.com
1 parent8e7811e commitab09679

File tree

3 files changed

+274
-14
lines changed

3 files changed

+274
-14
lines changed

‎doc/src/sgml/libpq.sgml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5171,7 +5171,10 @@ int PQflush(PGconn *conn);
51715171

51725172
<para>
51735173
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
5174-
operate as normal when processing pipeline results.
5174+
operate as normal when processing pipeline results. In particular,
5175+
a call to <function>PQisBusy</function> in the middle of a pipeline
5176+
returns 0 if the results for all the queries issued so far have been
5177+
consumed.
51755178
</para>
51765179

51775180
<para>

‎src/interfaces/libpq/fe-exec.c

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
12231223

12241224
/*
12251225
* pqAppendCmdQueueEntry
1226-
*Append a caller-allocated command queue entry to the queue.
1226+
*Append a caller-allocated entry to the command queue, and update
1227+
*conn->asyncStatus to account for it.
12271228
*
12281229
* The query itself must already have been put in the output buffer by the
12291230
* caller.
@@ -1239,6 +1240,38 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
12391240
conn->cmd_queue_tail->next=entry;
12401241

12411242
conn->cmd_queue_tail=entry;
1243+
1244+
switch (conn->pipelineStatus)
1245+
{
1246+
casePQ_PIPELINE_OFF:
1247+
casePQ_PIPELINE_ON:
1248+
1249+
/*
1250+
* When not in pipeline aborted state, if there's a result ready
1251+
* to be consumed, let it be so (that is, don't change away from
1252+
* READY or READY_MORE); otherwise set us busy to wait for
1253+
* something to arrive from the server.
1254+
*/
1255+
if (conn->asyncStatus==PGASYNC_IDLE)
1256+
conn->asyncStatus=PGASYNC_BUSY;
1257+
break;
1258+
1259+
casePQ_PIPELINE_ABORTED:
1260+
1261+
/*
1262+
* In aborted pipeline state, we don't expect anything from the
1263+
* server (since we don't send any queries that are queued).
1264+
* Therefore, if IDLE then do what PQgetResult would do to let
1265+
* itself consume commands from the queue; if we're in any other
1266+
* state, we don't have to do anything.
1267+
*/
1268+
if (conn->asyncStatus==PGASYNC_IDLE)
1269+
{
1270+
resetPQExpBuffer(&conn->errorMessage);
1271+
pqPipelineProcessQueue(conn);
1272+
}
1273+
break;
1274+
}
12421275
}
12431276

12441277
/*
@@ -1375,7 +1408,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
13751408

13761409
/* OK, it's launched! */
13771410
pqAppendCmdQueueEntry(conn,entry);
1378-
conn->asyncStatus=PGASYNC_BUSY;
13791411
return1;
13801412

13811413
sendFailed:
@@ -1510,10 +1542,6 @@ PQsendPrepare(PGconn *conn,
15101542
/* if insufficient memory, query just winds up NULL */
15111543
entry->query=strdup(query);
15121544

1513-
pqAppendCmdQueueEntry(conn,entry);
1514-
1515-
conn->asyncStatus=PGASYNC_BUSY;
1516-
15171545
/*
15181546
* Give the data a push (in pipeline mode, only if we're past the size
15191547
* threshold). In nonblock mode, don't complain if we're unable to send
@@ -1522,6 +1550,9 @@ PQsendPrepare(PGconn *conn,
15221550
if (pqPipelineFlush(conn)<0)
15231551
gotosendFailed;
15241552

1553+
/* OK, it's launched! */
1554+
pqAppendCmdQueueEntry(conn,entry);
1555+
15251556
return1;
15261557

15271558
sendFailed:
@@ -1815,7 +1846,7 @@ PQsendQueryGuts(PGconn *conn,
18151846

18161847
/* OK, it's launched! */
18171848
pqAppendCmdQueueEntry(conn,entry);
1818-
conn->asyncStatus=PGASYNC_BUSY;
1849+
18191850
return1;
18201851

18211852
sendFailed:
@@ -2445,7 +2476,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
24452476

24462477
/* OK, it's launched! */
24472478
pqAppendCmdQueueEntry(conn,entry);
2448-
conn->asyncStatus=PGASYNC_BUSY;
2479+
24492480
return1;
24502481

24512482
sendFailed:
@@ -2948,7 +2979,7 @@ pqCommandQueueAdvance(PGconn *conn)
29482979
* pqPipelineProcessQueue: subroutine for PQgetResult
29492980
*In pipeline mode, start processing the results of the next query in the queue.
29502981
*/
2951-
void
2982+
staticvoid
29522983
pqPipelineProcessQueue(PGconn*conn)
29532984
{
29542985
switch (conn->asyncStatus)
@@ -3072,15 +3103,15 @@ PQpipelineSync(PGconn *conn)
30723103
pqPutMsgEnd(conn)<0)
30733104
gotosendFailed;
30743105

3075-
pqAppendCmdQueueEntry(conn,entry);
3076-
30773106
/*
30783107
* Give the data a push. In nonblock mode, don't complain if we're unable
30793108
* to send it all; PQgetResult() will do any additional flushing needed.
30803109
*/
30813110
if (PQflush(conn)<0)
30823111
gotosendFailed;
3083-
conn->asyncStatus=PGASYNC_BUSY;
3112+
3113+
/* OK, it's launched! */
3114+
pqAppendCmdQueueEntry(conn,entry);
30843115

30853116
return1;
30863117

@@ -3115,7 +3146,7 @@ PQsendFlushRequest(PGconn *conn)
31153146
{
31163147
appendPQExpBufferStr(&conn->errorMessage,
31173148
libpq_gettext("another command is already in progress\n"));
3118-
returnfalse;
3149+
return0;
31193150
}
31203151

31213152
if (pqPutMsgStart('H',conn)<0||

‎src/test/modules/libpq_pipeline/libpq_pipeline.c

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929

3030
staticvoidexit_nicely(PGconn*conn);
31+
staticboolprocess_result(PGconn*conn,PGresult*res,intresults,
32+
intnumsent);
3133

3234
constchar*constprogname="libpq_pipeline";
3335

@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn)
13071309
fprintf(stderr,"ok\n");
13081310
}
13091311

1312+
/*
1313+
* In this test mode we send a stream of queries, with one in the middle
1314+
* causing an error. Verify that we can still send some more after the
1315+
* error and have libpq work properly.
1316+
*/
1317+
staticvoid
1318+
test_uniqviol(PGconn*conn)
1319+
{
1320+
intsock=PQsocket(conn);
1321+
PGresult*res;
1322+
OidparamTypes[2]= {INT8OID,INT8OID};
1323+
constchar*paramValues[2];
1324+
charparamValue0[MAXINT8LEN];
1325+
charparamValue1[MAXINT8LEN];
1326+
intctr=0;
1327+
intnumsent=0;
1328+
intresults=0;
1329+
boolread_done= false;
1330+
boolwrite_done= false;
1331+
boolerror_sent= false;
1332+
boolgot_error= false;
1333+
intswitched=0;
1334+
intsocketful=0;
1335+
fd_setin_fds;
1336+
fd_setout_fds;
1337+
1338+
fprintf(stderr,"uniqviol ...");
1339+
1340+
PQsetnonblocking(conn,1);
1341+
1342+
paramValues[0]=paramValue0;
1343+
paramValues[1]=paramValue1;
1344+
sprintf(paramValue1,"42");
1345+
1346+
res=PQexec(conn,"drop table if exists ppln_uniqviol;"
1347+
"create table ppln_uniqviol(id bigint primary key, idata bigint)");
1348+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
1349+
pg_fatal("failed to create table: %s",PQerrorMessage(conn));
1350+
1351+
res=PQexec(conn,"begin");
1352+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
1353+
pg_fatal("failed to begin transaction: %s",PQerrorMessage(conn));
1354+
1355+
res=PQprepare(conn,"insertion",
1356+
"insert into ppln_uniqviol values ($1, $2) returning id",
1357+
2,paramTypes);
1358+
if (res==NULL||PQresultStatus(res)!=PGRES_COMMAND_OK)
1359+
pg_fatal("failed to prepare query: %s",PQerrorMessage(conn));
1360+
1361+
if (PQenterPipelineMode(conn)!=1)
1362+
pg_fatal("failed to enter pipeline mode");
1363+
1364+
while (!read_done)
1365+
{
1366+
/*
1367+
* Avoid deadlocks by reading everything the server has sent before
1368+
* sending anything. (Special precaution is needed here to process
1369+
* PQisBusy before testing the socket for read-readiness, because the
1370+
* socket does not turn read-ready after "sending" queries in aborted
1371+
* pipeline mode.)
1372+
*/
1373+
while (PQisBusy(conn)==0)
1374+
{
1375+
boolnew_error;
1376+
1377+
if (results >=numsent)
1378+
{
1379+
if (write_done)
1380+
read_done= true;
1381+
break;
1382+
}
1383+
1384+
res=PQgetResult(conn);
1385+
new_error=process_result(conn,res,results,numsent);
1386+
if (new_error&&got_error)
1387+
pg_fatal("got two errors");
1388+
got_error |=new_error;
1389+
if (results++ >=numsent-1)
1390+
{
1391+
if (write_done)
1392+
read_done= true;
1393+
break;
1394+
}
1395+
}
1396+
1397+
if (read_done)
1398+
break;
1399+
1400+
FD_ZERO(&out_fds);
1401+
FD_SET(sock,&out_fds);
1402+
1403+
FD_ZERO(&in_fds);
1404+
FD_SET(sock,&in_fds);
1405+
1406+
if (select(sock+1,&in_fds,write_done ?NULL :&out_fds,NULL,NULL)==-1)
1407+
{
1408+
if (errno==EINTR)
1409+
continue;
1410+
pg_fatal("select() failed: %m");
1411+
}
1412+
1413+
if (FD_ISSET(sock,&in_fds)&&PQconsumeInput(conn)==0)
1414+
pg_fatal("PQconsumeInput failed: %s",PQerrorMessage(conn));
1415+
1416+
/*
1417+
* If the socket is writable and we haven't finished sending queries,
1418+
* send some.
1419+
*/
1420+
if (!write_done&&FD_ISSET(sock,&out_fds))
1421+
{
1422+
for (;;)
1423+
{
1424+
intflush;
1425+
1426+
/*
1427+
* provoke uniqueness violation exactly once after having
1428+
* switched to read mode.
1429+
*/
1430+
if (switched >=1&& !error_sent&&ctr %socketful >=socketful /2)
1431+
{
1432+
sprintf(paramValue0,"%d",numsent /2);
1433+
fprintf(stderr,"E");
1434+
error_sent= true;
1435+
}
1436+
else
1437+
{
1438+
fprintf(stderr,".");
1439+
sprintf(paramValue0,"%d",ctr++);
1440+
}
1441+
1442+
if (PQsendQueryPrepared(conn,"insertion",2,paramValues,NULL,NULL,0)!=1)
1443+
pg_fatal("failed to execute prepared query: %s",PQerrorMessage(conn));
1444+
numsent++;
1445+
1446+
/* Are we done writing? */
1447+
if (socketful!=0&&numsent %socketful==42&&error_sent)
1448+
{
1449+
if (PQsendFlushRequest(conn)!=1)
1450+
pg_fatal("failed to send flush request");
1451+
write_done= true;
1452+
fprintf(stderr,"\ndone writing\n");
1453+
PQflush(conn);
1454+
break;
1455+
}
1456+
1457+
/* is the outgoing socket full? */
1458+
flush=PQflush(conn);
1459+
if (flush==-1)
1460+
pg_fatal("failed to flush: %s",PQerrorMessage(conn));
1461+
if (flush==1)
1462+
{
1463+
if (socketful==0)
1464+
socketful=numsent;
1465+
fprintf(stderr,"\nswitch to reading\n");
1466+
switched++;
1467+
break;
1468+
}
1469+
}
1470+
}
1471+
}
1472+
1473+
if (!got_error)
1474+
pg_fatal("did not get expected error");
1475+
1476+
fprintf(stderr,"ok\n");
1477+
}
1478+
1479+
/*
1480+
* Subroutine for test_uniqviol; given a PGresult, print it out and consume
1481+
* the expected NULL that should follow it.
1482+
*
1483+
* Returns true if we read a fatal error message, otherwise false.
1484+
*/
1485+
staticbool
1486+
process_result(PGconn*conn,PGresult*res,intresults,intnumsent)
1487+
{
1488+
PGresult*res2;
1489+
boolgot_error= false;
1490+
1491+
if (res==NULL)
1492+
pg_fatal("got unexpected NULL");
1493+
1494+
switch (PQresultStatus(res))
1495+
{
1496+
casePGRES_FATAL_ERROR:
1497+
got_error= true;
1498+
fprintf(stderr,"result %d/%d (error): %s\n",results,numsent,PQerrorMessage(conn));
1499+
PQclear(res);
1500+
1501+
res2=PQgetResult(conn);
1502+
if (res2!=NULL)
1503+
pg_fatal("expected NULL, got %s",
1504+
PQresStatus(PQresultStatus(res2)));
1505+
break;
1506+
1507+
casePGRES_TUPLES_OK:
1508+
fprintf(stderr,"result %d/%d: %s\n",results,numsent,PQgetvalue(res,0,0));
1509+
PQclear(res);
1510+
1511+
res2=PQgetResult(conn);
1512+
if (res2!=NULL)
1513+
pg_fatal("expected NULL, got %s",
1514+
PQresStatus(PQresultStatus(res2)));
1515+
break;
1516+
1517+
casePGRES_PIPELINE_ABORTED:
1518+
fprintf(stderr,"result %d/%d: pipeline aborted\n",results,numsent);
1519+
res2=PQgetResult(conn);
1520+
if (res2!=NULL)
1521+
pg_fatal("expected NULL, got %s",
1522+
PQresStatus(PQresultStatus(res2)));
1523+
break;
1524+
1525+
default:
1526+
pg_fatal("got unexpected %s",PQresStatus(PQresultStatus(res)));
1527+
}
1528+
1529+
returngot_error;
1530+
}
1531+
1532+
13101533
staticvoid
13111534
usage(constchar*progname)
13121535
{
@@ -1331,6 +1554,7 @@ print_test_list(void)
13311554
printf("simple_pipeline\n");
13321555
printf("singlerow\n");
13331556
printf("transaction\n");
1557+
printf("uniqviol\n");
13341558
}
13351559

13361560
int
@@ -1436,6 +1660,8 @@ main(int argc, char **argv)
14361660
test_singlerowmode(conn);
14371661
elseif (strcmp(testname,"transaction")==0)
14381662
test_transaction(conn);
1663+
elseif (strcmp(testname,"uniqviol")==0)
1664+
test_uniqviol(conn);
14391665
else
14401666
{
14411667
fprintf(stderr,"\"%s\" is not a recognized test name\n",testname);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp