@@ -631,7 +631,8 @@ typedef struct
631631pg_time_usec_t txn_begin ;/* used for measuring schedule lag times */
632632pg_time_usec_t stmt_begin ;/* used for measuring statement latencies */
633633
634- bool prepared [MAX_SCRIPTS ];/* whether client prepared the script */
634+ /* whether client prepared each command of each script */
635+ bool * * prepared ;
635636
636637/*
637638 * For processing failures and repeating transactions with serialization
@@ -736,7 +737,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
736737 * argvCommand arguments, the first of which is the command or SQL
737738 *string itself. For SQL commands, after post-processing
738739 *argv[0] is the same as 'lines' with variables substituted.
739- * varprefix SQL commands terminated with \gset or \aset have this set
740+ * prepnameThe name that this command is prepared under, in prepare mode
741+ * varprefixSQL commands terminated with \gset or \aset have this set
740742 *to a non NULL value. If nonempty, it's used to prefix the
741743 *variable name that receives the value.
742744 * asetdo gset on all possible queries of a combined query (\;).
@@ -754,6 +756,7 @@ typedef struct Command
754756MetaCommand meta ;
755757int argc ;
756758char * argv [MAX_ARGS ];
759+ char * prepname ;
757760char * varprefix ;
758761PgBenchExpr * expr ;
759762SimpleStats stats ;
@@ -3027,13 +3030,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
30273030return true;
30283031}
30293032
3030- #define MAX_PREPARE_NAME 32
3031- static void
3032- preparedStatementName (char * buffer ,int file ,int state )
3033- {
3034- sprintf (buffer ,"P%d_%d" ,file ,state );
3035- }
3036-
30373033/*
30383034 * Report the abortion of the client when processing SQL commands.
30393035 */
@@ -3074,6 +3070,87 @@ chooseScript(TState *thread)
30743070return i - 1 ;
30753071}
30763072
3073+ /*
3074+ * Prepare the SQL command from st->use_file at command_num.
3075+ */
3076+ static void
3077+ prepareCommand (CState * st ,int command_num )
3078+ {
3079+ Command * command = sql_script [st -> use_file ].commands [command_num ];
3080+
3081+ /* No prepare for non-SQL commands */
3082+ if (command -> type != SQL_COMMAND )
3083+ return ;
3084+
3085+ /*
3086+ * If not already done, allocate space for 'prepared' flags: one boolean
3087+ * for each command of each script.
3088+ */
3089+ if (!st -> prepared )
3090+ {
3091+ st -> prepared = pg_malloc (sizeof (bool * )* num_scripts );
3092+ for (int i = 0 ;i < num_scripts ;i ++ )
3093+ {
3094+ ParsedScript * script = & sql_script [i ];
3095+ int numcmds ;
3096+
3097+ for (numcmds = 0 ;script -> commands [numcmds ]!= NULL ;numcmds ++ )
3098+ ;
3099+ st -> prepared [i ]= pg_malloc0 (sizeof (bool )* numcmds );
3100+ }
3101+ }
3102+
3103+ if (!st -> prepared [st -> use_file ][command_num ])
3104+ {
3105+ PGresult * res ;
3106+
3107+ pg_log_debug ("client %d preparing %s" ,st -> id ,command -> prepname );
3108+ res = PQprepare (st -> con ,command -> prepname ,
3109+ command -> argv [0 ],command -> argc - 1 ,NULL );
3110+ if (PQresultStatus (res )!= PGRES_COMMAND_OK )
3111+ pg_log_error ("%s" ,PQerrorMessage (st -> con ));
3112+ PQclear (res );
3113+ st -> prepared [st -> use_file ][command_num ]= true;
3114+ }
3115+ }
3116+
3117+ /*
3118+ * Prepare all the commands in the script that come after the \startpipeline
3119+ * that's at position st->command, and the first \endpipeline we find.
3120+ *
3121+ * This sets the ->prepared flag for each relevant command as well as the
3122+ * \startpipeline itself, but doesn't move the st->command counter.
3123+ */
3124+ static void
3125+ prepareCommandsInPipeline (CState * st )
3126+ {
3127+ int j ;
3128+ Command * * commands = sql_script [st -> use_file ].commands ;
3129+
3130+ Assert (commands [st -> command ]-> type == META_COMMAND &&
3131+ commands [st -> command ]-> meta == META_STARTPIPELINE );
3132+
3133+ /*
3134+ * We set the 'prepared' flag on the \startpipeline itself to flag that we
3135+ * don't need to do this next time without calling prepareCommand(), even
3136+ * though we don't actually prepare this command.
3137+ */
3138+ if (st -> prepared &&
3139+ st -> prepared [st -> use_file ][st -> command ])
3140+ return ;
3141+
3142+ for (j = st -> command + 1 ;commands [j ]!= NULL ;j ++ )
3143+ {
3144+ if (commands [j ]-> type == META_COMMAND &&
3145+ commands [j ]-> meta == META_ENDPIPELINE )
3146+ break ;
3147+
3148+ prepareCommand (st ,j );
3149+ }
3150+
3151+ st -> prepared [st -> use_file ][st -> command ]= true;
3152+ }
3153+
30773154/* Send a SQL command, using the chosen querymode */
30783155static bool
30793156sendCommand (CState * st ,Command * command )
@@ -3104,50 +3181,13 @@ sendCommand(CState *st, Command *command)
31043181}
31053182else if (querymode == QUERY_PREPARED )
31063183{
3107- char name [MAX_PREPARE_NAME ];
31083184const char * params [MAX_ARGS ];
31093185
3110- if (!st -> prepared [st -> use_file ])
3111- {
3112- int j ;
3113- Command * * commands = sql_script [st -> use_file ].commands ;
3114-
3115- for (j = 0 ;commands [j ]!= NULL ;j ++ )
3116- {
3117- PGresult * res ;
3118- char name [MAX_PREPARE_NAME ];
3119-
3120- if (commands [j ]-> type != SQL_COMMAND )
3121- continue ;
3122- preparedStatementName (name ,st -> use_file ,j );
3123- if (PQpipelineStatus (st -> con )== PQ_PIPELINE_OFF )
3124- {
3125- res = PQprepare (st -> con ,name ,
3126- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL );
3127- if (PQresultStatus (res )!= PGRES_COMMAND_OK )
3128- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
3129- PQclear (res );
3130- }
3131- else
3132- {
3133- /*
3134- * In pipeline mode, we use asynchronous functions. If a
3135- * server-side error occurs, it will be processed later
3136- * among the other results.
3137- */
3138- if (!PQsendPrepare (st -> con ,name ,
3139- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL ))
3140- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
3141- }
3142- }
3143- st -> prepared [st -> use_file ]= true;
3144- }
3145-
3186+ prepareCommand (st ,st -> command );
31463187getQueryParams (& st -> variables ,command ,params );
3147- preparedStatementName (name ,st -> use_file ,st -> command );
31483188
3149- pg_log_debug ("client %d sending %s" ,st -> id ,name );
3150- r = PQsendQueryPrepared (st -> con ,name ,command -> argc - 1 ,
3189+ pg_log_debug ("client %d sending %s" ,st -> id ,command -> prepname );
3190+ r = PQsendQueryPrepared (st -> con ,command -> prepname ,command -> argc - 1 ,
31513191params ,NULL ,NULL ,0 );
31523192}
31533193else /* unknown sql mode */
@@ -3620,7 +3660,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
36203660thread -> conn_duration += now - start ;
36213661
36223662/* Reset session-local state */
3623- memset (st -> prepared ,0 ,sizeof (st -> prepared ));
3663+ pg_free (st -> prepared );
3664+ st -> prepared = NULL ;
36243665}
36253666
36263667/*
@@ -4387,6 +4428,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
43874428return CSTATE_ABORTED ;
43884429}
43894430
4431+ /*
4432+ * If we're in prepared-query mode, we need to prepare all the
4433+ * commands that are inside the pipeline before we actually start the
4434+ * pipeline itself. This solves the problem that running BEGIN
4435+ * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
4436+ * snapshot having been acquired by the prepare within the pipeline.
4437+ */
4438+ if (querymode == QUERY_PREPARED )
4439+ prepareCommandsInPipeline (st );
4440+
43904441if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_OFF )
43914442{
43924443commandFailed (st ,"startpipeline" ,"already in pipeline mode" );
@@ -5466,6 +5517,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
54665517my_command -> varprefix = NULL ;/* allocated later, if needed */
54675518my_command -> expr = NULL ;
54685519initSimpleStats (& my_command -> stats );
5520+ my_command -> prepname = NULL ;/* set later, if needed */
54695521
54705522return my_command ;
54715523}
@@ -5497,6 +5549,7 @@ static void
54975549postprocess_sql_command (Command * my_command )
54985550{
54995551char buffer [128 ];
5552+ static int prepnum = 0 ;
55005553
55015554Assert (my_command -> type == SQL_COMMAND );
55025555
@@ -5505,15 +5558,17 @@ postprocess_sql_command(Command *my_command)
55055558buffer [strcspn (buffer ,"\n\r" )]= '\0' ;
55065559my_command -> first_line = pg_strdup (buffer );
55075560
5508- /*parse query if necessary */
5561+ /*Parse query and generate prepared statement name, if necessary */
55095562switch (querymode )
55105563{
55115564case QUERY_SIMPLE :
55125565my_command -> argv [0 ]= my_command -> lines .data ;
55135566my_command -> argc ++ ;
55145567break ;
5515- case QUERY_EXTENDED :
55165568case QUERY_PREPARED :
5569+ my_command -> prepname = psprintf ("P_%d" ,prepnum ++ );
5570+ /* fall through */
5571+ case QUERY_EXTENDED :
55175572if (!parseQuery (my_command ))
55185573exit (1 );
55195574break ;