@@ -476,7 +476,8 @@ typedef struct
476
476
pg_time_usec_t txn_begin ;/* used for measuring schedule lag times */
477
477
pg_time_usec_t stmt_begin ;/* used for measuring statement latencies */
478
478
479
- bool prepared [MAX_SCRIPTS ];/* whether client prepared the script */
479
+ /* whether client prepared each command of each script */
480
+ bool * * prepared ;
480
481
481
482
/* per client collected stats */
482
483
int64 cnt ;/* client transaction count, for -t */
@@ -568,7 +569,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
568
569
* argvCommand arguments, the first of which is the command or SQL
569
570
*string itself. For SQL commands, after post-processing
570
571
*argv[0] is the same as 'lines' with variables substituted.
571
- * varprefix SQL commands terminated with \gset or \aset have this set
572
+ * prepnameThe name that this command is prepared under, in prepare mode
573
+ * varprefixSQL commands terminated with \gset or \aset have this set
572
574
*to a non NULL value. If nonempty, it's used to prefix the
573
575
*variable name that receives the value.
574
576
* asetdo gset on all possible queries of a combined query (\;).
@@ -583,6 +585,7 @@ typedef struct Command
583
585
MetaCommand meta ;
584
586
int argc ;
585
587
char * argv [MAX_ARGS ];
588
+ char * prepname ;
586
589
char * varprefix ;
587
590
PgBenchExpr * expr ;
588
591
SimpleStats stats ;
@@ -2825,13 +2828,9 @@ runShellCommand(CState *st, char *variable, char **argv, int argc)
2825
2828
return true;
2826
2829
}
2827
2830
2828
- #define MAX_PREPARE_NAME 32
2829
- static void
2830
- preparedStatementName (char * buffer ,int file ,int state )
2831
- {
2832
- sprintf (buffer ,"P%d_%d" ,file ,state );
2833
- }
2834
-
2831
+ /*
2832
+ * Report the abortion of the client when processing SQL commands.
2833
+ */
2835
2834
static void
2836
2835
commandFailed (CState * st ,const char * cmd ,const char * message )
2837
2836
{
@@ -2858,6 +2857,87 @@ chooseScript(TState *thread)
2858
2857
return i - 1 ;
2859
2858
}
2860
2859
2860
+ /*
2861
+ * Prepare the SQL command from st->use_file at command_num.
2862
+ */
2863
+ static void
2864
+ prepareCommand (CState * st ,int command_num )
2865
+ {
2866
+ Command * command = sql_script [st -> use_file ].commands [command_num ];
2867
+
2868
+ /* No prepare for non-SQL commands */
2869
+ if (command -> type != SQL_COMMAND )
2870
+ return ;
2871
+
2872
+ /*
2873
+ * If not already done, allocate space for 'prepared' flags: one boolean
2874
+ * for each command of each script.
2875
+ */
2876
+ if (!st -> prepared )
2877
+ {
2878
+ st -> prepared = pg_malloc (sizeof (bool * )* num_scripts );
2879
+ for (int i = 0 ;i < num_scripts ;i ++ )
2880
+ {
2881
+ ParsedScript * script = & sql_script [i ];
2882
+ int numcmds ;
2883
+
2884
+ for (numcmds = 0 ;script -> commands [numcmds ]!= NULL ;numcmds ++ )
2885
+ ;
2886
+ st -> prepared [i ]= pg_malloc0 (sizeof (bool )* numcmds );
2887
+ }
2888
+ }
2889
+
2890
+ if (!st -> prepared [st -> use_file ][command_num ])
2891
+ {
2892
+ PGresult * res ;
2893
+
2894
+ pg_log_debug ("client %d preparing %s" ,st -> id ,command -> prepname );
2895
+ res = PQprepare (st -> con ,command -> prepname ,
2896
+ command -> argv [0 ],command -> argc - 1 ,NULL );
2897
+ if (PQresultStatus (res )!= PGRES_COMMAND_OK )
2898
+ pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2899
+ PQclear (res );
2900
+ st -> prepared [st -> use_file ][command_num ]= true;
2901
+ }
2902
+ }
2903
+
2904
+ /*
2905
+ * Prepare all the commands in the script that come after the \startpipeline
2906
+ * that's at position st->command, and the first \endpipeline we find.
2907
+ *
2908
+ * This sets the ->prepared flag for each relevant command as well as the
2909
+ * \startpipeline itself, but doesn't move the st->command counter.
2910
+ */
2911
+ static void
2912
+ prepareCommandsInPipeline (CState * st )
2913
+ {
2914
+ int j ;
2915
+ Command * * commands = sql_script [st -> use_file ].commands ;
2916
+
2917
+ Assert (commands [st -> command ]-> type == META_COMMAND &&
2918
+ commands [st -> command ]-> meta == META_STARTPIPELINE );
2919
+
2920
+ /*
2921
+ * We set the 'prepared' flag on the \startpipeline itself to flag that we
2922
+ * don't need to do this next time without calling prepareCommand(), even
2923
+ * though we don't actually prepare this command.
2924
+ */
2925
+ if (st -> prepared &&
2926
+ st -> prepared [st -> use_file ][st -> command ])
2927
+ return ;
2928
+
2929
+ for (j = st -> command + 1 ;commands [j ]!= NULL ;j ++ )
2930
+ {
2931
+ if (commands [j ]-> type == META_COMMAND &&
2932
+ commands [j ]-> meta == META_ENDPIPELINE )
2933
+ break ;
2934
+
2935
+ prepareCommand (st ,j );
2936
+ }
2937
+
2938
+ st -> prepared [st -> use_file ][st -> command ]= true;
2939
+ }
2940
+
2861
2941
/* Send a SQL command, using the chosen querymode */
2862
2942
static bool
2863
2943
sendCommand (CState * st ,Command * command )
@@ -2888,50 +2968,13 @@ sendCommand(CState *st, Command *command)
2888
2968
}
2889
2969
else if (querymode == QUERY_PREPARED )
2890
2970
{
2891
- char name [MAX_PREPARE_NAME ];
2892
2971
const char * params [MAX_ARGS ];
2893
2972
2894
- if (!st -> prepared [st -> use_file ])
2895
- {
2896
- int j ;
2897
- Command * * commands = sql_script [st -> use_file ].commands ;
2898
-
2899
- for (j = 0 ;commands [j ]!= NULL ;j ++ )
2900
- {
2901
- PGresult * res ;
2902
- char name [MAX_PREPARE_NAME ];
2903
-
2904
- if (commands [j ]-> type != SQL_COMMAND )
2905
- continue ;
2906
- preparedStatementName (name ,st -> use_file ,j );
2907
- if (PQpipelineStatus (st -> con )== PQ_PIPELINE_OFF )
2908
- {
2909
- res = PQprepare (st -> con ,name ,
2910
- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL );
2911
- if (PQresultStatus (res )!= PGRES_COMMAND_OK )
2912
- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2913
- PQclear (res );
2914
- }
2915
- else
2916
- {
2917
- /*
2918
- * In pipeline mode, we use asynchronous functions. If a
2919
- * server-side error occurs, it will be processed later
2920
- * among the other results.
2921
- */
2922
- if (!PQsendPrepare (st -> con ,name ,
2923
- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL ))
2924
- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
2925
- }
2926
- }
2927
- st -> prepared [st -> use_file ]= true;
2928
- }
2929
-
2973
+ prepareCommand (st ,st -> command );
2930
2974
getQueryParams (st ,command ,params );
2931
- preparedStatementName (name ,st -> use_file ,st -> command );
2932
2975
2933
- pg_log_debug ("client %d sending %s" ,st -> id ,name );
2934
- r = PQsendQueryPrepared (st -> con ,name ,command -> argc - 1 ,
2976
+ pg_log_debug ("client %d sending %s" ,st -> id ,command -> prepname );
2977
+ r = PQsendQueryPrepared (st -> con ,command -> prepname ,command -> argc - 1 ,
2935
2978
params ,NULL ,NULL ,0 );
2936
2979
}
2937
2980
else /* unknown sql mode */
@@ -3191,7 +3234,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
3191
3234
thread -> conn_duration += now - start ;
3192
3235
3193
3236
/* Reset session-local state */
3194
- memset (st -> prepared ,0 ,sizeof (st -> prepared ));
3237
+ pg_free (st -> prepared );
3238
+ st -> prepared = NULL ;
3195
3239
}
3196
3240
3197
3241
/* record transaction start time */
@@ -3766,6 +3810,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
3766
3810
return CSTATE_ABORTED ;
3767
3811
}
3768
3812
3813
+ /*
3814
+ * If we're in prepared-query mode, we need to prepare all the
3815
+ * commands that are inside the pipeline before we actually start the
3816
+ * pipeline itself. This solves the problem that running BEGIN
3817
+ * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
3818
+ * snapshot having been acquired by the prepare within the pipeline.
3819
+ */
3820
+ if (querymode == QUERY_PREPARED )
3821
+ prepareCommandsInPipeline (st );
3822
+
3769
3823
if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_OFF )
3770
3824
{
3771
3825
commandFailed (st ,"startpipeline" ,"already in pipeline mode" );
@@ -4785,6 +4839,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
4785
4839
my_command -> varprefix = NULL ;/* allocated later, if needed */
4786
4840
my_command -> expr = NULL ;
4787
4841
initSimpleStats (& my_command -> stats );
4842
+ my_command -> prepname = NULL ;/* set later, if needed */
4788
4843
4789
4844
return my_command ;
4790
4845
}
@@ -4816,6 +4871,7 @@ static void
4816
4871
postprocess_sql_command (Command * my_command )
4817
4872
{
4818
4873
char buffer [128 ];
4874
+ static int prepnum = 0 ;
4819
4875
4820
4876
Assert (my_command -> type == SQL_COMMAND );
4821
4877
@@ -4824,15 +4880,17 @@ postprocess_sql_command(Command *my_command)
4824
4880
buffer [strcspn (buffer ,"\n\r" )]= '\0' ;
4825
4881
my_command -> first_line = pg_strdup (buffer );
4826
4882
4827
- /*parse query if necessary */
4883
+ /*Parse query and generate prepared statement name, if necessary */
4828
4884
switch (querymode )
4829
4885
{
4830
4886
case QUERY_SIMPLE :
4831
4887
my_command -> argv [0 ]= my_command -> lines .data ;
4832
4888
my_command -> argc ++ ;
4833
4889
break ;
4834
- case QUERY_EXTENDED :
4835
4890
case QUERY_PREPARED :
4891
+ my_command -> prepname = psprintf ("P_%d" ,prepnum ++ );
4892
+ /* fall through */
4893
+ case QUERY_EXTENDED :
4836
4894
if (!parseQuery (my_command ))
4837
4895
exit (1 );
4838
4896
break ;