Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit41625ab

Browse files
committed
psql: Add support for pipelines
With \bind, \parse, \bind_named and \close, it is possible to issuequeries from psql using the extended protocol. However, it was notpossible to send these queries using libpq's pipeline mode. Thisfeature has two advantages:- Testing. Pipeline tests were only possible with pgbench, using TAPtests. It now becomes possible to have more SQL tests that are able tostress the backend with pipelines and extended queries. More tests willbe added in a follow-up commit that were discussed on some otherthreads. Some external projects in the community had to implement theirown facility to work around this limitation.- Emulation of custom workloads, with more control over the actionstaken by a client with libpq APIs. It is possible to emulate moreworkload patterns to bottleneck the backend with the extended queryprotocol.This patch adds six new meta-commands to be able to control pipelines:* \startpipeline starts a new pipeline. All extended queries are queueduntil the end of the pipeline are reached or a sync request is sent andprocessed.* \endpipeline ends an existing pipeline. All queued commands are sentto the server and all responses are processed by psql.* \syncpipeline queues a synchronisation request, without flushing thecommands to the server, equivalent of PQsendPipelineSync().* \flush, equivalent of PQflush().* \flushrequest, equivalent of PQsendFlushRequest()* \getresults reads the server's results for the queries in a pipeline.Unsent data is automatically pushed when \getresults is called. It ispossible to control the number of results read in a single meta-commandexecution with an optional parameter, 0 means that all the resultsshould be read.Author: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>Reviewed-by: Jelte Fennema-Nio <postgres@jeltef.nl>Reviewed-by: Kirill Reshke <reshkekirill@gmail.com>Discussion:https://postgr.es/m/CAO6_XqroE7JuMEm1sWz55rp9fAYX2JwmcP_3m_v51vnOFdsLiQ@mail.gmail.com
1 parent40af897 commit41625ab

File tree

11 files changed

+1497
-12
lines changed

11 files changed

+1497
-12
lines changed

‎doc/src/sgml/ref/psql-ref.sgml

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3674,6 +3674,73 @@ testdb=&gt; <userinput>\setenv LESS -imx4F</userinput>
36743674
</listitem>
36753675
</varlistentry>
36763676

3677+
<varlistentry id="app-psql-meta-command-pipeline">
3678+
<term><literal>\startpipeline</literal></term>
3679+
<term><literal>\syncpipeline</literal></term>
3680+
<term><literal>\endpipeline</literal></term>
3681+
<term><literal>\flushrequest</literal></term>
3682+
<term><literal>\flush</literal></term>
3683+
<term><literal>\getresults [ <replaceable class="parameter">number_results</replaceable> ]</literal></term>
3684+
3685+
<listitem>
3686+
<para>
3687+
This group of commands implements pipelining of SQL statements.
3688+
A pipeline must begin with a <command>\startpipeline</command>
3689+
and end with an <command>\endpipeline</command>. In between there
3690+
may be any number of <command>\syncpipeline</command> commands,
3691+
which sends a <link linkend="protocol-flow-ext-query">sync message</link>
3692+
without ending the ongoing pipeline and flushing the send buffer.
3693+
In pipeline mode, statements are sent to the server without waiting
3694+
for the results of previous statements.
3695+
See <xref linkend="libpq-pipeline-mode"/> for more details.
3696+
</para>
3697+
3698+
<para>
3699+
Pipeline mode requires the use of the extended query protocol. All
3700+
queries need to be sent using the meta-commands
3701+
<literal>\bind</literal>, <literal>\bind_named</literal>,
3702+
<literal>\close</literal> or <literal>\parse</literal>. While a
3703+
pipeline is ongoing, <literal>\g</literal> will append the current
3704+
query buffer to the pipeline. Other meta-commands like
3705+
<literal>\gx</literal> or <literal>\gdesc</literal> are not allowed
3706+
in pipeline mode.
3707+
</para>
3708+
3709+
<para>
3710+
<command>\flushrequest</command> appends a flush command to the
3711+
pipeline, allowing to read results with
3712+
<command>\getresults</command> without issuing a sync or ending the
3713+
pipeline. <command>\getresults</command> will automatically push
3714+
unsent data to the server. <command>\flush</command> can be used to
3715+
manually push unsent data.
3716+
</para>
3717+
3718+
<para>
3719+
<command>\getresults</command> accepts an optional
3720+
<replaceable class="parameter">number_results</replaceable> parameter.
3721+
If provided, only the first
3722+
<replaceable class="parameter">number_results</replaceable> pending
3723+
results will be read. If not provided or <literal>0</literal>, all
3724+
pending results are read. The commands <literal>\bind</literal>,
3725+
<literal>\bind_named</literal>, <literal>\close</literal>,
3726+
<literal>\parse</literal> and <literal>\syncpipeline</literal>
3727+
generate one result to get.
3728+
</para>
3729+
3730+
<para>
3731+
Example:
3732+
<programlisting>
3733+
\startpipeline
3734+
SELECT 1 \bind \g
3735+
\flushrequest
3736+
\getresults
3737+
\endpipeline
3738+
</programlisting>
3739+
</para>
3740+
3741+
</listitem>
3742+
</varlistentry>
3743+
36773744

36783745
<varlistentry id="app-psql-meta-command-t-lc">
36793746
<term><literal>\t</literal></term>

‎src/bin/psql/command.c

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,12 @@ static backslashResult exec_command_else(PsqlScanState scan_state, ConditionalSt
9090
PQExpBufferquery_buf);
9191
staticbackslashResultexec_command_endif(PsqlScanStatescan_state,ConditionalStackcstack,
9292
PQExpBufferquery_buf);
93+
staticbackslashResultexec_command_endpipeline(PsqlScanStatescan_state,boolactive_branch);
9394
staticbackslashResultexec_command_encoding(PsqlScanStatescan_state,boolactive_branch);
9495
staticbackslashResultexec_command_errverbose(PsqlScanStatescan_state,boolactive_branch);
9596
staticbackslashResultexec_command_f(PsqlScanStatescan_state,boolactive_branch);
97+
staticbackslashResultexec_command_flush(PsqlScanStatescan_state,boolactive_branch);
98+
staticbackslashResultexec_command_flushrequest(PsqlScanStatescan_state,boolactive_branch);
9699
staticbackslashResultexec_command_g(PsqlScanStatescan_state,boolactive_branch,
97100
constchar*cmd);
98101
staticbackslashResultprocess_command_g_options(char*first_option,
@@ -103,6 +106,7 @@ static backslashResult exec_command_gdesc(PsqlScanState scan_state, bool active_
103106
staticbackslashResultexec_command_getenv(PsqlScanStatescan_state,boolactive_branch,
104107
constchar*cmd);
105108
staticbackslashResultexec_command_gexec(PsqlScanStatescan_state,boolactive_branch);
109+
staticbackslashResultexec_command_getresults(PsqlScanStatescan_state,boolactive_branch);
106110
staticbackslashResultexec_command_gset(PsqlScanStatescan_state,boolactive_branch);
107111
staticbackslashResultexec_command_help(PsqlScanStatescan_state,boolactive_branch);
108112
staticbackslashResultexec_command_html(PsqlScanStatescan_state,boolactive_branch);
@@ -132,6 +136,8 @@ static backslashResult exec_command_setenv(PsqlScanState scan_state, bool active
132136
constchar*cmd);
133137
staticbackslashResultexec_command_sf_sv(PsqlScanStatescan_state,boolactive_branch,
134138
constchar*cmd,boolis_func);
139+
staticbackslashResultexec_command_startpipeline(PsqlScanStatescan_state,boolactive_branch);
140+
staticbackslashResultexec_command_syncpipeline(PsqlScanStatescan_state,boolactive_branch);
135141
staticbackslashResultexec_command_t(PsqlScanStatescan_state,boolactive_branch);
136142
staticbackslashResultexec_command_T(PsqlScanStatescan_state,boolactive_branch);
137143
staticbackslashResultexec_command_timing(PsqlScanStatescan_state,boolactive_branch);
@@ -351,18 +357,26 @@ exec_command(const char *cmd,
351357
status=exec_command_else(scan_state,cstack,query_buf);
352358
elseif (strcmp(cmd,"endif")==0)
353359
status=exec_command_endif(scan_state,cstack,query_buf);
360+
elseif (strcmp(cmd,"endpipeline")==0)
361+
status=exec_command_endpipeline(scan_state,active_branch);
354362
elseif (strcmp(cmd,"encoding")==0)
355363
status=exec_command_encoding(scan_state,active_branch);
356364
elseif (strcmp(cmd,"errverbose")==0)
357365
status=exec_command_errverbose(scan_state,active_branch);
358366
elseif (strcmp(cmd,"f")==0)
359367
status=exec_command_f(scan_state,active_branch);
368+
elseif (strcmp(cmd,"flush")==0)
369+
status=exec_command_flush(scan_state,active_branch);
370+
elseif (strcmp(cmd,"flushrequest")==0)
371+
status=exec_command_flushrequest(scan_state,active_branch);
360372
elseif (strcmp(cmd,"g")==0||strcmp(cmd,"gx")==0)
361373
status=exec_command_g(scan_state,active_branch,cmd);
362374
elseif (strcmp(cmd,"gdesc")==0)
363375
status=exec_command_gdesc(scan_state,active_branch);
364376
elseif (strcmp(cmd,"getenv")==0)
365377
status=exec_command_getenv(scan_state,active_branch,cmd);
378+
elseif (strcmp(cmd,"getresults")==0)
379+
status=exec_command_getresults(scan_state,active_branch);
366380
elseif (strcmp(cmd,"gexec")==0)
367381
status=exec_command_gexec(scan_state,active_branch);
368382
elseif (strcmp(cmd,"gset")==0)
@@ -411,6 +425,10 @@ exec_command(const char *cmd,
411425
status=exec_command_sf_sv(scan_state,active_branch,cmd, true);
412426
elseif (strcmp(cmd,"sv")==0||strcmp(cmd,"sv+")==0)
413427
status=exec_command_sf_sv(scan_state,active_branch,cmd, false);
428+
elseif (strcmp(cmd,"startpipeline")==0)
429+
status=exec_command_startpipeline(scan_state,active_branch);
430+
elseif (strcmp(cmd,"syncpipeline")==0)
431+
status=exec_command_syncpipeline(scan_state,active_branch);
414432
elseif (strcmp(cmd,"t")==0)
415433
status=exec_command_t(scan_state,active_branch);
416434
elseif (strcmp(cmd,"T")==0)
@@ -1515,6 +1533,44 @@ exec_command_f(PsqlScanState scan_state, bool active_branch)
15151533
returnsuccess ?PSQL_CMD_SKIP_LINE :PSQL_CMD_ERROR;
15161534
}
15171535

1536+
/*
1537+
* \flush -- call PQflush() on the connection
1538+
*/
1539+
staticbackslashResult
1540+
exec_command_flush(PsqlScanStatescan_state,boolactive_branch)
1541+
{
1542+
backslashResultstatus=PSQL_CMD_SKIP_LINE;
1543+
1544+
if (active_branch)
1545+
{
1546+
pset.send_mode=PSQL_SEND_FLUSH;
1547+
status=PSQL_CMD_SEND;
1548+
}
1549+
else
1550+
ignore_slash_options(scan_state);
1551+
1552+
returnstatus;
1553+
}
1554+
1555+
/*
1556+
* \flushrequest -- call PQsendFlushRequest() on the connection
1557+
*/
1558+
staticbackslashResult
1559+
exec_command_flushrequest(PsqlScanStatescan_state,boolactive_branch)
1560+
{
1561+
backslashResultstatus=PSQL_CMD_SKIP_LINE;
1562+
1563+
if (active_branch)
1564+
{
1565+
pset.send_mode=PSQL_SEND_FLUSH_REQUEST;
1566+
status=PSQL_CMD_SEND;
1567+
}
1568+
else
1569+
ignore_slash_options(scan_state);
1570+
1571+
returnstatus;
1572+
}
1573+
15181574
/*
15191575
* \g [(pset-option[=pset-value] ...)] [filename/shell-command]
15201576
* \gx [(pset-option[=pset-value] ...)] [filename/shell-command]
@@ -1550,6 +1606,14 @@ exec_command_g(PsqlScanState scan_state, bool active_branch, const char *cmd)
15501606

15511607
if (status==PSQL_CMD_SKIP_LINE&&active_branch)
15521608
{
1609+
if (strcmp(cmd,"gx")==0&&
1610+
PQpipelineStatus(pset.db)!=PQ_PIPELINE_OFF)
1611+
{
1612+
pg_log_error("\\gx not allowed in pipeline mode");
1613+
clean_extended_state();
1614+
returnPSQL_CMD_ERROR;
1615+
}
1616+
15531617
if (!fname)
15541618
pset.gfname=NULL;
15551619
else
@@ -1703,6 +1767,42 @@ exec_command_getenv(PsqlScanState scan_state, bool active_branch,
17031767
returnsuccess ?PSQL_CMD_SKIP_LINE :PSQL_CMD_ERROR;
17041768
}
17051769

1770+
/*
1771+
* \getresults -- read results
1772+
*/
1773+
staticbackslashResult
1774+
exec_command_getresults(PsqlScanStatescan_state,boolactive_branch)
1775+
{
1776+
backslashResultstatus=PSQL_CMD_SKIP_LINE;
1777+
1778+
if (active_branch)
1779+
{
1780+
char*opt;
1781+
intnum_results;
1782+
1783+
pset.send_mode=PSQL_SEND_GET_RESULTS;
1784+
status=PSQL_CMD_SEND;
1785+
opt=psql_scan_slash_option(scan_state,OT_NORMAL,NULL, false);
1786+
1787+
pset.requested_results=0;
1788+
if (opt!=NULL)
1789+
{
1790+
num_results=atoi(opt);
1791+
if (num_results<0)
1792+
{
1793+
pg_log_error("\\getresults: invalid number of requested results");
1794+
returnPSQL_CMD_SKIP_LINE;
1795+
}
1796+
pset.requested_results=num_results;
1797+
}
1798+
}
1799+
else
1800+
ignore_slash_options(scan_state);
1801+
1802+
returnstatus;
1803+
}
1804+
1805+
17061806
/*
17071807
* \gexec -- send query and execute each field of result
17081808
*/
@@ -1713,6 +1813,12 @@ exec_command_gexec(PsqlScanState scan_state, bool active_branch)
17131813

17141814
if (active_branch)
17151815
{
1816+
if (PQpipelineStatus(pset.db)!=PQ_PIPELINE_OFF)
1817+
{
1818+
pg_log_error("\\gexec not allowed in pipeline mode");
1819+
clean_extended_state();
1820+
returnPSQL_CMD_ERROR;
1821+
}
17161822
pset.gexec_flag= true;
17171823
status=PSQL_CMD_SEND;
17181824
}
@@ -1733,6 +1839,13 @@ exec_command_gset(PsqlScanState scan_state, bool active_branch)
17331839
char*prefix=psql_scan_slash_option(scan_state,
17341840
OT_NORMAL,NULL, false);
17351841

1842+
if (PQpipelineStatus(pset.db)!=PQ_PIPELINE_OFF)
1843+
{
1844+
pg_log_error("\\gset not allowed in pipeline mode");
1845+
clean_extended_state();
1846+
returnPSQL_CMD_ERROR;
1847+
}
1848+
17361849
if (prefix)
17371850
pset.gset_prefix=prefix;
17381851
else
@@ -2718,6 +2831,63 @@ exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
27182831
returnstatus;
27192832
}
27202833

2834+
/*
2835+
* \startpipeline -- enter pipeline mode
2836+
*/
2837+
staticbackslashResult
2838+
exec_command_startpipeline(PsqlScanStatescan_state,boolactive_branch)
2839+
{
2840+
backslashResultstatus=PSQL_CMD_SKIP_LINE;
2841+
2842+
if (active_branch)
2843+
{
2844+
pset.send_mode=PSQL_SEND_START_PIPELINE_MODE;
2845+
status=PSQL_CMD_SEND;
2846+
}
2847+
else
2848+
ignore_slash_options(scan_state);
2849+
2850+
returnstatus;
2851+
}
2852+
2853+
/*
2854+
* \syncpipeline -- send a sync message to an active pipeline
2855+
*/
2856+
staticbackslashResult
2857+
exec_command_syncpipeline(PsqlScanStatescan_state,boolactive_branch)
2858+
{
2859+
backslashResultstatus=PSQL_CMD_SKIP_LINE;
2860+
2861+
if (active_branch)
2862+
{
2863+
pset.send_mode=PSQL_SEND_PIPELINE_SYNC;
2864+
status=PSQL_CMD_SEND;
2865+
}
2866+
else
2867+
ignore_slash_options(scan_state);
2868+
2869+
returnstatus;
2870+
}
2871+
2872+
/*
2873+
* \endpipeline -- end pipeline mode
2874+
*/
2875+
staticbackslashResult
2876+
exec_command_endpipeline(PsqlScanStatescan_state,boolactive_branch)
2877+
{
2878+
backslashResultstatus=PSQL_CMD_SKIP_LINE;
2879+
2880+
if (active_branch)
2881+
{
2882+
pset.send_mode=PSQL_SEND_END_PIPELINE_MODE;
2883+
status=PSQL_CMD_SEND;
2884+
}
2885+
else
2886+
ignore_slash_options(scan_state);
2887+
2888+
returnstatus;
2889+
}
2890+
27212891
/*
27222892
* \t -- turn off table headers and row count
27232893
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp