@@ -476,7 +476,8 @@ typedef struct
476476pg_time_usec_t txn_begin ;/* used for measuring schedule lag times */
477477pg_time_usec_t stmt_begin ;/* used for measuring statement latencies */
478478
479- bool prepared [MAX_SCRIPTS ];/* whether client prepared the script */
479+ /* whether client prepared each command of each script */
480+ bool * * prepared ;
480481
481482/* per client collected stats */
482483int64 cnt ;/* client transaction count, for -t */
@@ -568,7 +569,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
568569 * argvCommand arguments, the first of which is the command or SQL
569570 *string itself. For SQL commands, after post-processing
570571 *argv[0] is the same as 'lines' with variables substituted.
571- * varprefix SQL commands terminated with \gset or \aset have this set
572+ * prepnameThe name that this command is prepared under, in prepare mode
573+ * varprefixSQL commands terminated with \gset or \aset have this set
572574 *to a non NULL value. If nonempty, it's used to prefix the
573575 *variable name that receives the value.
574576 * asetdo gset on all possible queries of a combined query (\;).
@@ -583,6 +585,7 @@ typedef struct Command
583585MetaCommand meta ;
584586int argc ;
585587char * argv [MAX_ARGS ];
588+ char * prepname ;
586589char * varprefix ;
587590PgBenchExpr * expr ;
588591SimpleStats stats ;
@@ -2825,13 +2828,9 @@ runShellCommand(CState *st, char *variable, char **argv, int argc)
28252828return true;
28262829}
28272830
2828- #define MAX_PREPARE_NAME 32
2829- static void
2830- preparedStatementName (char * buffer ,int file ,int state )
2831- {
2832- sprintf (buffer ,"P%d_%d" ,file ,state );
2833- }
2834-
2831+ /*
2832+ * Report the abortion of the client when processing SQL commands.
2833+ */
28352834static void
28362835commandFailed (CState * st ,const char * cmd ,const char * message )
28372836{
@@ -2858,6 +2857,87 @@ chooseScript(TState *thread)
28582857return i - 1 ;
28592858}
28602859
2860+ /*
2861+ * Prepare the SQL command from st->use_file at command_num.
2862+ */
2863+ static void
2864+ prepareCommand (CState * st ,int command_num )
2865+ {
2866+ Command * command = sql_script [st -> use_file ].commands [command_num ];
2867+
2868+ /* No prepare for non-SQL commands */
2869+ if (command -> type != SQL_COMMAND )
2870+ return ;
2871+
2872+ /*
2873+ * If not already done, allocate space for 'prepared' flags: one boolean
2874+ * for each command of each script.
2875+ */
2876+ if (!st -> prepared )
2877+ {
2878+ st -> prepared = pg_malloc (sizeof (bool * )* num_scripts );
2879+ for (int i = 0 ;i < num_scripts ;i ++ )
2880+ {
2881+ ParsedScript * script = & sql_script [i ];
2882+ int numcmds ;
2883+
2884+ for (numcmds = 0 ;script -> commands [numcmds ]!= NULL ;numcmds ++ )
2885+ ;
2886+ st -> prepared [i ]= pg_malloc0 (sizeof (bool )* numcmds );
2887+ }
2888+ }
2889+
2890+ if (!st -> prepared [st -> use_file ][command_num ])
2891+ {
2892+ PGresult * res ;
2893+
2894+ pg_log_debug ("client %d preparing %s" ,st -> id ,command -> prepname );
2895+ res = PQprepare (st -> con ,command -> prepname ,
2896+ command -> argv [0 ],command -> argc - 1 ,NULL );
2897+ if (PQresultStatus (res )!= PGRES_COMMAND_OK )
2898+ pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2899+ PQclear (res );
2900+ st -> prepared [st -> use_file ][command_num ]= true;
2901+ }
2902+ }
2903+
2904+ /*
2905+ * Prepare all the commands in the script that come after the \startpipeline
2906+ * that's at position st->command, and the first \endpipeline we find.
2907+ *
2908+ * This sets the ->prepared flag for each relevant command as well as the
2909+ * \startpipeline itself, but doesn't move the st->command counter.
2910+ */
2911+ static void
2912+ prepareCommandsInPipeline (CState * st )
2913+ {
2914+ int j ;
2915+ Command * * commands = sql_script [st -> use_file ].commands ;
2916+
2917+ Assert (commands [st -> command ]-> type == META_COMMAND &&
2918+ commands [st -> command ]-> meta == META_STARTPIPELINE );
2919+
2920+ /*
2921+ * We set the 'prepared' flag on the \startpipeline itself to flag that we
2922+ * don't need to do this next time without calling prepareCommand(), even
2923+ * though we don't actually prepare this command.
2924+ */
2925+ if (st -> prepared &&
2926+ st -> prepared [st -> use_file ][st -> command ])
2927+ return ;
2928+
2929+ for (j = st -> command + 1 ;commands [j ]!= NULL ;j ++ )
2930+ {
2931+ if (commands [j ]-> type == META_COMMAND &&
2932+ commands [j ]-> meta == META_ENDPIPELINE )
2933+ break ;
2934+
2935+ prepareCommand (st ,j );
2936+ }
2937+
2938+ st -> prepared [st -> use_file ][st -> command ]= true;
2939+ }
2940+
28612941/* Send a SQL command, using the chosen querymode */
28622942static bool
28632943sendCommand (CState * st ,Command * command )
@@ -2888,50 +2968,13 @@ sendCommand(CState *st, Command *command)
28882968}
28892969else if (querymode == QUERY_PREPARED )
28902970{
2891- char name [MAX_PREPARE_NAME ];
28922971const char * params [MAX_ARGS ];
28932972
2894- if (!st -> prepared [st -> use_file ])
2895- {
2896- int j ;
2897- Command * * commands = sql_script [st -> use_file ].commands ;
2898-
2899- for (j = 0 ;commands [j ]!= NULL ;j ++ )
2900- {
2901- PGresult * res ;
2902- char name [MAX_PREPARE_NAME ];
2903-
2904- if (commands [j ]-> type != SQL_COMMAND )
2905- continue ;
2906- preparedStatementName (name ,st -> use_file ,j );
2907- if (PQpipelineStatus (st -> con )== PQ_PIPELINE_OFF )
2908- {
2909- res = PQprepare (st -> con ,name ,
2910- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL );
2911- if (PQresultStatus (res )!= PGRES_COMMAND_OK )
2912- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2913- PQclear (res );
2914- }
2915- else
2916- {
2917- /*
2918- * In pipeline mode, we use asynchronous functions. If a
2919- * server-side error occurs, it will be processed later
2920- * among the other results.
2921- */
2922- if (!PQsendPrepare (st -> con ,name ,
2923- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL ))
2924- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2925- }
2926- }
2927- st -> prepared [st -> use_file ]= true;
2928- }
2929-
2973+ prepareCommand (st ,st -> command );
29302974getQueryParams (st ,command ,params );
2931- preparedStatementName (name ,st -> use_file ,st -> command );
29322975
2933- pg_log_debug ("client %d sending %s" ,st -> id ,name );
2934- r = PQsendQueryPrepared (st -> con ,name ,command -> argc - 1 ,
2976+ pg_log_debug ("client %d sending %s" ,st -> id ,command -> prepname );
2977+ r = PQsendQueryPrepared (st -> con ,command -> prepname ,command -> argc - 1 ,
29352978params ,NULL ,NULL ,0 );
29362979}
29372980else /* unknown sql mode */
@@ -3191,7 +3234,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
31913234thread -> conn_duration += now - start ;
31923235
31933236/* Reset session-local state */
3194- memset (st -> prepared ,0 ,sizeof (st -> prepared ));
3237+ pg_free (st -> prepared );
3238+ st -> prepared = NULL ;
31953239}
31963240
31973241/* record transaction start time */
@@ -3766,6 +3810,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
37663810return CSTATE_ABORTED ;
37673811}
37683812
3813+ /*
3814+ * If we're in prepared-query mode, we need to prepare all the
3815+ * commands that are inside the pipeline before we actually start the
3816+ * pipeline itself. This solves the problem that running BEGIN
3817+ * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
3818+ * snapshot having been acquired by the prepare within the pipeline.
3819+ */
3820+ if (querymode == QUERY_PREPARED )
3821+ prepareCommandsInPipeline (st );
3822+
37693823if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_OFF )
37703824{
37713825commandFailed (st ,"startpipeline" ,"already in pipeline mode" );
@@ -4785,6 +4839,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
47854839my_command -> varprefix = NULL ;/* allocated later, if needed */
47864840my_command -> expr = NULL ;
47874841initSimpleStats (& my_command -> stats );
4842+ my_command -> prepname = NULL ;/* set later, if needed */
47884843
47894844return my_command ;
47904845}
@@ -4816,6 +4871,7 @@ static void
48164871postprocess_sql_command (Command * my_command )
48174872{
48184873char buffer [128 ];
4874+ static int prepnum = 0 ;
48194875
48204876Assert (my_command -> type == SQL_COMMAND );
48214877
@@ -4824,15 +4880,17 @@ postprocess_sql_command(Command *my_command)
48244880buffer [strcspn (buffer ,"\n\r" )]= '\0' ;
48254881my_command -> first_line = pg_strdup (buffer );
48264882
4827- /*parse query if necessary */
4883+ /*Parse query and generate prepared statement name, if necessary */
48284884switch (querymode )
48294885{
48304886case QUERY_SIMPLE :
48314887my_command -> argv [0 ]= my_command -> lines .data ;
48324888my_command -> argc ++ ;
48334889break ;
4834- case QUERY_EXTENDED :
48354890case QUERY_PREPARED :
4891+ my_command -> prepname = psprintf ("P_%d" ,prepnum ++ );
4892+ /* fall through */
4893+ case QUERY_EXTENDED :
48364894if (!parseQuery (my_command ))
48374895exit (1 );
48384896break ;