Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7c1f426

Browse files
alvherrehoriguti
andcommitted
libpq: Improve idle state handling in pipeline mode
We were going into IDLE state too soon when executing queries viaPQsendQuery in pipeline mode, causing several scenarios to misbehave indifferent ways -- most notably, as reported by Daniele Varrazzo, that awarning message is produced by libpq: message type 0x33 arrived from server while idleBut it is also possible, if queries are sent and results consumed not inlockstep, for the expected mediating NULL result values from PQgetResultto be lost (a problem which has not been reported, but which is moreserious).Fix this by introducing two new concepts: one is a command queue elementPGQUERY_CLOSE to tell libpq to wait for the CloseComplete serverresponse to the Close message that is sent by PQsendQuery. Because theapplication is not expecting any PGresult from this, the mechanism toconsume it is a bit hackish.The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLEstate for libpq's state machine to differentiate "really idle" frommerely "the idle state that occurs in between reading results from theserver for elements in the pipeline". This makes libpq not go fullyIDLE when the libpq command queue contains entries; in normal cases, weonly go IDLE once at the end of the pipeline, when the server responseto the final SYNC message is received. (However, there are corner casesit doesn't fix, such as terminating the query sequence byPQsendFlushRequest instead of PQpipelineSync; this sort of scenario iswhat requires PGQUERY_CLOSE bit above.)This last bit helps make the libpq state machine clearer; in particularwe can get rid of an ugly hack in pqParseInput3 to avoid consideringIDLE as such when the command queue contains entries.A new test mode is added to libpq_pipeline.c to tickle some relatedproblematic cases.Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>Discussion:https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
1 parent0b71e43 commit7c1f426

File tree

6 files changed

+425
-38
lines changed

6 files changed

+425
-38
lines changed

‎src/interfaces/libpq/fe-exec.c

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
12791279
* itself consume commands from the queue; if we're in any other
12801280
* state, we don't have to do anything.
12811281
*/
1282-
if (conn->asyncStatus==PGASYNC_IDLE)
1282+
if (conn->asyncStatus==PGASYNC_IDLE||
1283+
conn->asyncStatus==PGASYNC_PIPELINE_IDLE)
12831284
{
12841285
resetPQExpBuffer(&conn->errorMessage);
12851286
pqPipelineProcessQueue(conn);
@@ -1338,6 +1339,7 @@ static int
13381339
PQsendQueryInternal(PGconn*conn,constchar*query,boolnewQuery)
13391340
{
13401341
PGcmdQueueEntry*entry=NULL;
1342+
PGcmdQueueEntry*entry2=NULL;
13411343

13421344
if (!PQsendQueryStart(conn,newQuery))
13431345
return0;
@@ -1353,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
13531355
entry=pqAllocCmdQueueEntry(conn);
13541356
if (entry==NULL)
13551357
return0;/* error msg already set */
1358+
if (conn->pipelineStatus!=PQ_PIPELINE_OFF)
1359+
{
1360+
entry2=pqAllocCmdQueueEntry(conn);
1361+
if (entry2==NULL)
1362+
gotosendFailed;
1363+
}
13561364

13571365
/* Send the query message(s) */
13581366
if (conn->pipelineStatus==PQ_PIPELINE_OFF)
@@ -1422,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
14221430

14231431
/* OK, it's launched! */
14241432
pqAppendCmdQueueEntry(conn,entry);
1433+
1434+
/*
1435+
* When pipeline mode is in use, we need a second entry in the command
1436+
* queue to represent Close Portal message. This allows us later to wait
1437+
* for the CloseComplete message to be received before getting in IDLE
1438+
* state.
1439+
*/
1440+
if (conn->pipelineStatus!=PQ_PIPELINE_OFF)
1441+
{
1442+
entry2->queryclass=PGQUERY_CLOSE;
1443+
entry2->query=NULL;
1444+
pqAppendCmdQueueEntry(conn,entry2);
1445+
}
1446+
14251447
return1;
14261448

14271449
sendFailed:
@@ -1667,11 +1689,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
16671689
switch (conn->asyncStatus)
16681690
{
16691691
casePGASYNC_IDLE:
1692+
casePGASYNC_PIPELINE_IDLE:
16701693
casePGASYNC_READY:
16711694
casePGASYNC_READY_MORE:
16721695
casePGASYNC_BUSY:
16731696
/* ok to queue */
16741697
break;
1698+
16751699
casePGASYNC_COPY_IN:
16761700
casePGASYNC_COPY_OUT:
16771701
casePGASYNC_COPY_BOTH:
@@ -2047,19 +2071,22 @@ PQgetResult(PGconn *conn)
20472071
{
20482072
casePGASYNC_IDLE:
20492073
res=NULL;/* query is complete */
2050-
if (conn->pipelineStatus!=PQ_PIPELINE_OFF)
2051-
{
2052-
/*
2053-
* We're about to return the NULL that terminates the round of
2054-
* results from the current query; prepare to send the results
2055-
* of the next query when we're called next. Also, since this
2056-
* is the start of the results of the next query, clear any
2057-
* prior error message.
2058-
*/
2059-
resetPQExpBuffer(&conn->errorMessage);
2060-
pqPipelineProcessQueue(conn);
2061-
}
20622074
break;
2075+
casePGASYNC_PIPELINE_IDLE:
2076+
Assert(conn->pipelineStatus!=PQ_PIPELINE_OFF);
2077+
2078+
/*
2079+
* We're about to return the NULL that terminates the round of
2080+
* results from the current query; prepare to send the results
2081+
* of the next query, if any, when we're called next. If there's
2082+
* no next element in the command queue, this gets us in IDLE
2083+
* state.
2084+
*/
2085+
resetPQExpBuffer(&conn->errorMessage);
2086+
pqPipelineProcessQueue(conn);
2087+
res=NULL;/* query is complete */
2088+
break;
2089+
20632090
casePGASYNC_READY:
20642091

20652092
/*
@@ -2080,7 +2107,7 @@ PQgetResult(PGconn *conn)
20802107
* We're about to send the results of the current query. Set
20812108
* us idle now, and ...
20822109
*/
2083-
conn->asyncStatus=PGASYNC_IDLE;
2110+
conn->asyncStatus=PGASYNC_PIPELINE_IDLE;
20842111

20852112
/*
20862113
* ... in cases when we're sending a pipeline-sync result,
@@ -2124,6 +2151,22 @@ PQgetResult(PGconn *conn)
21242151
break;
21252152
}
21262153

2154+
/* If the next command we expect is CLOSE, read and consume it */
2155+
if (conn->asyncStatus==PGASYNC_PIPELINE_IDLE&&
2156+
conn->cmd_queue_head&&
2157+
conn->cmd_queue_head->queryclass==PGQUERY_CLOSE)
2158+
{
2159+
if (res&&res->resultStatus!=PGRES_FATAL_ERROR)
2160+
{
2161+
conn->asyncStatus=PGASYNC_BUSY;
2162+
parseInput(conn);
2163+
conn->asyncStatus=PGASYNC_PIPELINE_IDLE;
2164+
}
2165+
else
2166+
/* we won't ever see the Close */
2167+
pqCommandQueueAdvance(conn);
2168+
}
2169+
21272170
if (res)
21282171
{
21292172
inti;
@@ -2932,7 +2975,10 @@ PQexitPipelineMode(PGconn *conn)
29322975
if (!conn)
29332976
return0;
29342977

2935-
if (conn->pipelineStatus==PQ_PIPELINE_OFF)
2978+
if (conn->pipelineStatus==PQ_PIPELINE_OFF&&
2979+
(conn->asyncStatus==PGASYNC_IDLE||
2980+
conn->asyncStatus==PGASYNC_PIPELINE_IDLE)&&
2981+
conn->cmd_queue_head==NULL)
29362982
return1;
29372983

29382984
switch (conn->asyncStatus)
@@ -2949,9 +2995,16 @@ PQexitPipelineMode(PGconn *conn)
29492995
libpq_gettext("cannot exit pipeline mode while busy\n"));
29502996
return0;
29512997

2952-
default:
2998+
casePGASYNC_IDLE:
2999+
casePGASYNC_PIPELINE_IDLE:
29533000
/* OK */
29543001
break;
3002+
3003+
casePGASYNC_COPY_IN:
3004+
casePGASYNC_COPY_OUT:
3005+
casePGASYNC_COPY_BOTH:
3006+
appendPQExpBufferStr(&conn->errorMessage,
3007+
libpq_gettext("cannot exit pipeline mode while in COPY\n"));
29553008
}
29563009

29573010
/* still work to process */
@@ -2988,6 +3041,10 @@ pqCommandQueueAdvance(PGconn *conn)
29883041
prevquery=conn->cmd_queue_head;
29893042
conn->cmd_queue_head=conn->cmd_queue_head->next;
29903043

3044+
/* If the queue is now empty, reset the tail too */
3045+
if (conn->cmd_queue_head==NULL)
3046+
conn->cmd_queue_tail=NULL;
3047+
29913048
/* and make it recyclable */
29923049
prevquery->next=NULL;
29933050
pqRecycleCmdQueueEntry(conn,prevquery);
@@ -3010,15 +3067,35 @@ pqPipelineProcessQueue(PGconn *conn)
30103067
casePGASYNC_BUSY:
30113068
/* client still has to process current query or results */
30123069
return;
3070+
30133071
casePGASYNC_IDLE:
3072+
/*
3073+
* If we're in IDLE mode and there's some command in the queue,
3074+
* get us into PIPELINE_IDLE mode and process normally. Otherwise
3075+
* there's nothing for us to do.
3076+
*/
3077+
if (conn->cmd_queue_head!=NULL)
3078+
{
3079+
conn->asyncStatus=PGASYNC_PIPELINE_IDLE;
3080+
break;
3081+
}
3082+
return;
3083+
3084+
casePGASYNC_PIPELINE_IDLE:
3085+
Assert(conn->pipelineStatus!=PQ_PIPELINE_OFF);
30143086
/* next query please */
30153087
break;
30163088
}
30173089

3018-
/* Nothing to do if not in pipeline mode, or queue is empty */
3019-
if (conn->pipelineStatus==PQ_PIPELINE_OFF||
3020-
conn->cmd_queue_head==NULL)
3090+
/*
3091+
* If there are no further commands to process in the queue, get us in
3092+
* "real idle" mode now.
3093+
*/
3094+
if (conn->cmd_queue_head==NULL)
3095+
{
3096+
conn->asyncStatus=PGASYNC_IDLE;
30213097
return;
3098+
}
30223099

30233100
/* Initialize async result-accumulation state */
30243101
pqClearAsyncResult(conn);
@@ -3105,6 +3182,7 @@ PQpipelineSync(PGconn *conn)
31053182
casePGASYNC_READY_MORE:
31063183
casePGASYNC_BUSY:
31073184
casePGASYNC_IDLE:
3185+
casePGASYNC_PIPELINE_IDLE:
31083186
/* OK to send sync */
31093187
break;
31103188
}

‎src/interfaces/libpq/fe-protocol3.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
158158
if (conn->asyncStatus!=PGASYNC_IDLE)
159159
return;
160160

161-
/*
162-
* We're also notionally not-IDLE when in pipeline mode the state
163-
* says "idle" (so we have completed receiving the results of one
164-
* query from the server and dispatched them to the application)
165-
* but another query is queued; yield back control to caller so
166-
* that they can initiate processing of the next query in the
167-
* queue.
168-
*/
169-
if (conn->pipelineStatus!=PQ_PIPELINE_OFF&&
170-
conn->cmd_queue_head!=NULL)
171-
return;
172-
173161
/*
174162
* Unexpected message in IDLE state; need to recover somehow.
175163
* ERROR messages are handled using the notice processor;
@@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
296284
}
297285
break;
298286
case'2':/* Bind Complete */
287+
/* Nothing to do for this message type */
288+
break;
299289
case'3':/* Close Complete */
300-
/* Nothing to do for these message types */
290+
/*
291+
* If we get CloseComplete when waiting for it, consume
292+
* the queue element and keep going. A result is not
293+
* expected from this message; it is just there so that
294+
* we know to wait for it when PQsendQuery is used in
295+
* pipeline mode, before going in IDLE state. Failing to
296+
* do this makes us receive CloseComplete when IDLE, which
297+
* creates problems.
298+
*/
299+
if (conn->cmd_queue_head&&
300+
conn->cmd_queue_head->queryclass==PGQUERY_CLOSE)
301+
{
302+
pqCommandQueueAdvance(conn);
303+
}
304+
301305
break;
302306
case'S':/* parameter status */
303307
if (getParameterStatus(conn))

‎src/interfaces/libpq/libpq-int.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ typedef enum
224224
* query */
225225
PGASYNC_COPY_IN,/* Copy In data transfer in progress */
226226
PGASYNC_COPY_OUT,/* Copy Out data transfer in progress */
227-
PGASYNC_COPY_BOTH/* Copy In/Out data transfer in progress */
227+
PGASYNC_COPY_BOTH,/* Copy In/Out data transfer in progress */
228+
PGASYNC_PIPELINE_IDLE,/* "Idle" between commands in pipeline mode */
228229
}PGAsyncStatusType;
229230

230231
/* Target server type (decoded value of target_session_attrs) */
@@ -310,7 +311,8 @@ typedef enum
310311
PGQUERY_EXTENDED,/* full Extended protocol (PQexecParams) */
311312
PGQUERY_PREPARE,/* Parse only (PQprepare) */
312313
PGQUERY_DESCRIBE,/* Describe Statement or Portal */
313-
PGQUERY_SYNC/* Sync (at end of a pipeline) */
314+
PGQUERY_SYNC,/* Sync (at end of a pipeline) */
315+
PGQUERY_CLOSE
314316
}PGQueryClass;
315317

316318
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp