@@ -631,7 +631,8 @@ typedef struct
631
631
pg_time_usec_t txn_begin ;/* used for measuring schedule lag times */
632
632
pg_time_usec_t stmt_begin ;/* used for measuring statement latencies */
633
633
634
- bool prepared [MAX_SCRIPTS ];/* whether client prepared the script */
634
+ /* whether client prepared each command of each script */
635
+ bool * * prepared ;
635
636
636
637
/*
637
638
* For processing failures and repeating transactions with serialization
@@ -736,7 +737,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
736
737
* argvCommand arguments, the first of which is the command or SQL
737
738
*string itself. For SQL commands, after post-processing
738
739
*argv[0] is the same as 'lines' with variables substituted.
739
- * varprefix SQL commands terminated with \gset or \aset have this set
740
+ * prepnameThe name that this command is prepared under, in prepare mode
741
+ * varprefixSQL commands terminated with \gset or \aset have this set
740
742
*to a non NULL value. If nonempty, it's used to prefix the
741
743
*variable name that receives the value.
742
744
* asetdo gset on all possible queries of a combined query (\;).
@@ -754,6 +756,7 @@ typedef struct Command
754
756
MetaCommand meta ;
755
757
int argc ;
756
758
char * argv [MAX_ARGS ];
759
+ char * prepname ;
757
760
char * varprefix ;
758
761
PgBenchExpr * expr ;
759
762
SimpleStats stats ;
@@ -3027,13 +3030,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
3027
3030
return true;
3028
3031
}
3029
3032
3030
- #define MAX_PREPARE_NAME 32
3031
- static void
3032
- preparedStatementName (char * buffer ,int file ,int state )
3033
- {
3034
- sprintf (buffer ,"P%d_%d" ,file ,state );
3035
- }
3036
-
3037
3033
/*
3038
3034
* Report the abortion of the client when processing SQL commands.
3039
3035
*/
@@ -3074,6 +3070,87 @@ chooseScript(TState *thread)
3074
3070
return i - 1 ;
3075
3071
}
3076
3072
3073
+ /*
3074
+ * Prepare the SQL command from st->use_file at command_num.
3075
+ */
3076
+ static void
3077
+ prepareCommand (CState * st ,int command_num )
3078
+ {
3079
+ Command * command = sql_script [st -> use_file ].commands [command_num ];
3080
+
3081
+ /* No prepare for non-SQL commands */
3082
+ if (command -> type != SQL_COMMAND )
3083
+ return ;
3084
+
3085
+ /*
3086
+ * If not already done, allocate space for 'prepared' flags: one boolean
3087
+ * for each command of each script.
3088
+ */
3089
+ if (!st -> prepared )
3090
+ {
3091
+ st -> prepared = pg_malloc (sizeof (bool * )* num_scripts );
3092
+ for (int i = 0 ;i < num_scripts ;i ++ )
3093
+ {
3094
+ ParsedScript * script = & sql_script [i ];
3095
+ int numcmds ;
3096
+
3097
+ for (numcmds = 0 ;script -> commands [numcmds ]!= NULL ;numcmds ++ )
3098
+ ;
3099
+ st -> prepared [i ]= pg_malloc0 (sizeof (bool )* numcmds );
3100
+ }
3101
+ }
3102
+
3103
+ if (!st -> prepared [st -> use_file ][command_num ])
3104
+ {
3105
+ PGresult * res ;
3106
+
3107
+ pg_log_debug ("client %d preparing %s" ,st -> id ,command -> prepname );
3108
+ res = PQprepare (st -> con ,command -> prepname ,
3109
+ command -> argv [0 ],command -> argc - 1 ,NULL );
3110
+ if (PQresultStatus (res )!= PGRES_COMMAND_OK )
3111
+ pg_log_error ("%s" ,PQerrorMessage (st -> con ));
3112
+ PQclear (res );
3113
+ st -> prepared [st -> use_file ][command_num ]= true;
3114
+ }
3115
+ }
3116
+
3117
+ /*
3118
+ * Prepare all the commands in the script that come after the \startpipeline
3119
+ * that's at position st->command, and the first \endpipeline we find.
3120
+ *
3121
+ * This sets the ->prepared flag for each relevant command as well as the
3122
+ * \startpipeline itself, but doesn't move the st->command counter.
3123
+ */
3124
+ static void
3125
+ prepareCommandsInPipeline (CState * st )
3126
+ {
3127
+ int j ;
3128
+ Command * * commands = sql_script [st -> use_file ].commands ;
3129
+
3130
+ Assert (commands [st -> command ]-> type == META_COMMAND &&
3131
+ commands [st -> command ]-> meta == META_STARTPIPELINE );
3132
+
3133
+ /*
3134
+ * We set the 'prepared' flag on the \startpipeline itself to flag that we
3135
+ * don't need to do this next time without calling prepareCommand(), even
3136
+ * though we don't actually prepare this command.
3137
+ */
3138
+ if (st -> prepared &&
3139
+ st -> prepared [st -> use_file ][st -> command ])
3140
+ return ;
3141
+
3142
+ for (j = st -> command + 1 ;commands [j ]!= NULL ;j ++ )
3143
+ {
3144
+ if (commands [j ]-> type == META_COMMAND &&
3145
+ commands [j ]-> meta == META_ENDPIPELINE )
3146
+ break ;
3147
+
3148
+ prepareCommand (st ,j );
3149
+ }
3150
+
3151
+ st -> prepared [st -> use_file ][st -> command ]= true;
3152
+ }
3153
+
3077
3154
/* Send a SQL command, using the chosen querymode */
3078
3155
static bool
3079
3156
sendCommand (CState * st ,Command * command )
@@ -3104,50 +3181,13 @@ sendCommand(CState *st, Command *command)
3104
3181
}
3105
3182
else if (querymode == QUERY_PREPARED )
3106
3183
{
3107
- char name [MAX_PREPARE_NAME ];
3108
3184
const char * params [MAX_ARGS ];
3109
3185
3110
- if (!st -> prepared [st -> use_file ])
3111
- {
3112
- int j ;
3113
- Command * * commands = sql_script [st -> use_file ].commands ;
3114
-
3115
- for (j = 0 ;commands [j ]!= NULL ;j ++ )
3116
- {
3117
- PGresult * res ;
3118
- char name [MAX_PREPARE_NAME ];
3119
-
3120
- if (commands [j ]-> type != SQL_COMMAND )
3121
- continue ;
3122
- preparedStatementName (name ,st -> use_file ,j );
3123
- if (PQpipelineStatus (st -> con )== PQ_PIPELINE_OFF )
3124
- {
3125
- res = PQprepare (st -> con ,name ,
3126
- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL );
3127
- if (PQresultStatus (res )!= PGRES_COMMAND_OK )
3128
- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
3129
- PQclear (res );
3130
- }
3131
- else
3132
- {
3133
- /*
3134
- * In pipeline mode, we use asynchronous functions. If a
3135
- * server-side error occurs, it will be processed later
3136
- * among the other results.
3137
- */
3138
- if (!PQsendPrepare (st -> con ,name ,
3139
- commands [j ]-> argv [0 ],commands [j ]-> argc - 1 ,NULL ))
3140
- pg_log_error ("%s" ,PQerrorMessage (st -> con ));
3141
- }
3142
- }
3143
- st -> prepared [st -> use_file ]= true;
3144
- }
3145
-
3186
+ prepareCommand (st ,st -> command );
3146
3187
getQueryParams (& st -> variables ,command ,params );
3147
- preparedStatementName (name ,st -> use_file ,st -> command );
3148
3188
3149
- pg_log_debug ("client %d sending %s" ,st -> id ,name );
3150
- r = PQsendQueryPrepared (st -> con ,name ,command -> argc - 1 ,
3189
+ pg_log_debug ("client %d sending %s" ,st -> id ,command -> prepname );
3190
+ r = PQsendQueryPrepared (st -> con ,command -> prepname ,command -> argc - 1 ,
3151
3191
params ,NULL ,NULL ,0 );
3152
3192
}
3153
3193
else /* unknown sql mode */
@@ -3620,7 +3660,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
3620
3660
thread -> conn_duration += now - start ;
3621
3661
3622
3662
/* Reset session-local state */
3623
- memset (st -> prepared ,0 ,sizeof (st -> prepared ));
3663
+ pg_free (st -> prepared );
3664
+ st -> prepared = NULL ;
3624
3665
}
3625
3666
3626
3667
/*
@@ -4387,6 +4428,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
4387
4428
return CSTATE_ABORTED ;
4388
4429
}
4389
4430
4431
+ /*
4432
+ * If we're in prepared-query mode, we need to prepare all the
4433
+ * commands that are inside the pipeline before we actually start the
4434
+ * pipeline itself. This solves the problem that running BEGIN
4435
+ * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
4436
+ * snapshot having been acquired by the prepare within the pipeline.
4437
+ */
4438
+ if (querymode == QUERY_PREPARED )
4439
+ prepareCommandsInPipeline (st );
4440
+
4390
4441
if (PQpipelineStatus (st -> con )!= PQ_PIPELINE_OFF )
4391
4442
{
4392
4443
commandFailed (st ,"startpipeline" ,"already in pipeline mode" );
@@ -5466,6 +5517,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
5466
5517
my_command -> varprefix = NULL ;/* allocated later, if needed */
5467
5518
my_command -> expr = NULL ;
5468
5519
initSimpleStats (& my_command -> stats );
5520
+ my_command -> prepname = NULL ;/* set later, if needed */
5469
5521
5470
5522
return my_command ;
5471
5523
}
@@ -5497,6 +5549,7 @@ static void
5497
5549
postprocess_sql_command (Command * my_command )
5498
5550
{
5499
5551
char buffer [128 ];
5552
+ static int prepnum = 0 ;
5500
5553
5501
5554
Assert (my_command -> type == SQL_COMMAND );
5502
5555
@@ -5505,15 +5558,17 @@ postprocess_sql_command(Command *my_command)
5505
5558
buffer [strcspn (buffer ,"\n\r" )]= '\0' ;
5506
5559
my_command -> first_line = pg_strdup (buffer );
5507
5560
5508
- /*parse query if necessary */
5561
+ /*Parse query and generate prepared statement name, if necessary */
5509
5562
switch (querymode )
5510
5563
{
5511
5564
case QUERY_SIMPLE :
5512
5565
my_command -> argv [0 ]= my_command -> lines .data ;
5513
5566
my_command -> argc ++ ;
5514
5567
break ;
5515
- case QUERY_EXTENDED :
5516
5568
case QUERY_PREPARED :
5569
+ my_command -> prepname = psprintf ("P_%d" ,prepnum ++ );
5570
+ /* fall through */
5571
+ case QUERY_EXTENDED :
5517
5572
if (!parseQuery (my_command ))
5518
5573
exit (1 );
5519
5574
break ;