@@ -395,10 +395,11 @@ typedef enum
395395 *
396396 * CSTATE_START_COMMAND starts the execution of a command. On a SQL
397397 * command, the command is sent to the server, and we move to
398- * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
399- * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
400- * meta-commands are executed immediately. If the command about to start
401- * is actually beyond the end of the script, advance to CSTATE_END_TX.
398+ * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
399+ * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
400+ * wait for it to expire. Other meta-commands are executed immediately. If
401+ * the command about to start is actually beyond the end of the script,
402+ * advance to CSTATE_END_TX.
402403 *
403404 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
404405 * for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
530531META_IF ,/* \if */
531532META_ELIF ,/* \elif */
532533META_ELSE ,/* \else */
533- META_ENDIF /* \endif */
534+ META_ENDIF ,/* \endif */
535+ META_STARTPIPELINE ,/* \startpipeline */
536+ META_ENDPIPELINE /* \endpipeline */
534537}MetaCommand ;
535538
536539typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
25682571mc = META_GSET ;
25692572else if (pg_strcasecmp (cmd ,"aset" )== 0 )
25702573mc = META_ASET ;
2574+ else if (pg_strcasecmp (cmd ,"startpipeline" )== 0 )
2575+ mc = META_STARTPIPELINE ;
2576+ else if (pg_strcasecmp (cmd ,"endpipeline" )== 0 )
2577+ mc = META_ENDPIPELINE ;
25712578else
25722579mc = META_NONE ;
25732580return mc ;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
27572764if (commands [j ]-> type != SQL_COMMAND )
27582765continue ;
27592766preparedStatementName (name ,st -> use_file ,j );
2760- res = PQprepare (st -> con ,name ,
2761- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL );
2762- if (PQresultStatus (res )!= PGRES_COMMAND_OK )
2763- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2764- PQclear (res );
2767+ if (PQpipelineStatus (st -> con )== PQ_PIPELINE_OFF )
2768+ {
2769+ res = PQprepare (st -> con ,name ,
2770+ commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL );
2771+ if (PQresultStatus (res )!= PGRES_COMMAND_OK )
2772+ pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2773+ PQclear (res );
2774+ }
2775+ else
2776+ {
2777+ /*
2778+ * In pipeline mode, we use asynchronous functions. If a
2779+ * server-side error occurs, it will be processed later
2780+ * among the other results.
2781+ */
2782+ if (!PQsendPrepare (st -> con ,name ,
2783+ commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL ))
2784+ pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2785+ }
27652786}
27662787st -> prepared [st -> use_file ]= true;
27672788}
@@ -2802,10 +2823,11 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
28022823int qrynum = 0 ;
28032824
28042825/*
2805- * varprefix should be set only with \gset or \aset, andSQL commands do
2806- * not need it.
2826+ * varprefix should be set only with \gset or \aset, and\endpipeline and
2827+ *SQL commands do not need it.
28072828 */
28082829Assert ((meta == META_NONE && varprefix == NULL )||
2830+ ((meta == META_ENDPIPELINE )&& varprefix == NULL )||
28092831 ((meta == META_GSET || meta == META_ASET )&& varprefix != NULL ));
28102832
28112833res = PQgetResult (st -> con );
@@ -2874,6 +2896,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
28742896/* otherwise the result is simply thrown away by PQclear below */
28752897break ;
28762898
2899+ case PGRES_PIPELINE_SYNC :
2900+ pg_log_debug ("client %d pipeline ending" ,st -> id );
2901+ if (PQexitPipelineMode (st -> con )!= 1 )
2902+ pg_log_error ("client %d failed to exit pipeline mode: %s" ,st -> id ,
2903+ PQerrorMessage (st -> con ));
2904+ break ;
2905+
28772906default :
28782907/* anything else is unexpected */
28792908pg_log_error ("client %d script %d aborted in command %d query %d: %s" ,
@@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
31273156/* Execute the command */
31283157if (command -> type == SQL_COMMAND )
31293158{
3159+ /* disallow \aset and \gset in pipeline mode */
3160+ if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_OFF )
3161+ {
3162+ if (command -> meta == META_GSET )
3163+ {
3164+ commandFailed (st ,"gset" ,"\\gset is not allowed in pipeline mode" );
3165+ st -> state = CSTATE_ABORTED ;
3166+ break ;
3167+ }
3168+ else if (command -> meta == META_ASET )
3169+ {
3170+ commandFailed (st ,"aset" ,"\\aset is not allowed in pipeline mode" );
3171+ st -> state = CSTATE_ABORTED ;
3172+ break ;
3173+ }
3174+ }
3175+
31303176if (!sendCommand (st ,command ))
31313177{
31323178commandFailed (st ,"SQL" ,"SQL command send failed" );
31333179st -> state = CSTATE_ABORTED ;
31343180}
31353181else
3136- st -> state = CSTATE_WAIT_RESULT ;
3182+ {
3183+ /* Wait for results, unless in pipeline mode */
3184+ if (PQpipelineStatus (st -> con )== PQ_PIPELINE_OFF )
3185+ st -> state = CSTATE_WAIT_RESULT ;
3186+ else
3187+ st -> state = CSTATE_END_COMMAND ;
3188+ }
31373189}
31383190else if (command -> type == META_COMMAND )
31393191{
@@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
32733325if (readCommandResponse (st ,
32743326sql_script [st -> use_file ].commands [st -> command ]-> meta ,
32753327sql_script [st -> use_file ].commands [st -> command ]-> varprefix ))
3276- st -> state = CSTATE_END_COMMAND ;
3328+ {
3329+ /*
3330+ * outside of pipeline mode: stop reading results.
3331+ * pipeline mode: continue reading results until an
3332+ * end-of-pipeline response.
3333+ */
3334+ if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_ON )
3335+ st -> state = CSTATE_END_COMMAND ;
3336+ }
32773337else
32783338st -> state = CSTATE_ABORTED ;
32793339break ;
@@ -3516,6 +3576,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
35163576return CSTATE_ABORTED ;
35173577}
35183578}
3579+ else if (command -> meta == META_STARTPIPELINE )
3580+ {
3581+ /*
3582+ * In pipeline mode, we use a workflow based on libpq pipeline
3583+ * functions.
3584+ */
3585+ if (querymode == QUERY_SIMPLE )
3586+ {
3587+ commandFailed (st ,"startpipeline" ,"cannot use pipeline mode with the simple query protocol" );
3588+ return CSTATE_ABORTED ;
3589+ }
3590+
3591+ if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_OFF )
3592+ {
3593+ commandFailed (st ,"startpipeline" ,"already in pipeline mode" );
3594+ return CSTATE_ABORTED ;
3595+ }
3596+ if (PQenterPipelineMode (st -> con )== 0 )
3597+ {
3598+ commandFailed (st ,"startpipeline" ,"failed to enter pipeline mode" );
3599+ return CSTATE_ABORTED ;
3600+ }
3601+ }
3602+ else if (command -> meta == META_ENDPIPELINE )
3603+ {
3604+ if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_ON )
3605+ {
3606+ commandFailed (st ,"endpipeline" ,"not in pipeline mode" );
3607+ return CSTATE_ABORTED ;
3608+ }
3609+ if (!PQpipelineSync (st -> con ))
3610+ {
3611+ commandFailed (st ,"endpipeline" ,"failed to send a pipeline sync" );
3612+ return CSTATE_ABORTED ;
3613+ }
3614+ /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
3615+ /* collect pending results before getting out of pipeline mode */
3616+ return CSTATE_WAIT_RESULT ;
3617+ }
35193618
35203619/*
35213620 * executing the expression or shell command might have taken a
@@ -4725,7 +4824,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
47254824syntax_error (source ,lineno ,my_command -> first_line ,my_command -> argv [0 ],
47264825"missing command" ,NULL ,-1 );
47274826}
4728- else if (my_command -> meta == META_ELSE || my_command -> meta == META_ENDIF )
4827+ else if (my_command -> meta == META_ELSE || my_command -> meta == META_ENDIF ||
4828+ my_command -> meta == META_STARTPIPELINE ||
4829+ my_command -> meta == META_ENDPIPELINE )
47294830{
47304831if (my_command -> argc != 1 )
47314832syntax_error (source ,lineno ,my_command -> first_line ,my_command -> argv [0 ],