Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit663e50e

Browse files
committed
pgbench: Prepare commands in pipelines in advance
Failing to do so results in an error when a pgbench script tries tostart a serializable transaction inside a pipeline, because by the timeBEGIN ISOLATION LEVEL SERIALIZABLE is executed, we're already in atransaction that has acquired a snapshot, so the server rightfullycomplains.We can work around that by preparing all commands in the pipeline beforeactually starting the pipeline. This changes the existing code in twoaspects: first, we now prepare each command individually at the pointwhere that command is about to be executed; previously, we would prepareall commands in a script as soon as the first command of that scriptwould be executed. It's hard to see that this would make much of adifference (particularly since it only affects the first time to executeeach script in a client), but I didn't actually try to measure it.Secondly, we no longer use PQsendPrepare() in pipeline mode, but onlyPQprepare. There's no specific reason for this change other than nolonger needing to do differently in pipeline mode. (Previously we hadno choice, because in pipeline mode PQprepare could not be used.)Backpatch to 14, where pgbench got support for pipeline mode.Reported-by: Yugo NAGATA <nagata@sraoss.co.jp>Discussion:https://postgr.es/m/20210716153013.fc53b1c780b06fccc07a7f0d@sraoss.co.jp
1 parentf3daa31 commit663e50e

File tree

2 files changed

+130
-52
lines changed

2 files changed

+130
-52
lines changed

‎src/bin/pgbench/pgbench.c

Lines changed: 110 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,8 @@ typedef struct
476476
pg_time_usec_ttxn_begin;/* used for measuring schedule lag times */
477477
pg_time_usec_tstmt_begin;/* used for measuring statement latencies */
478478

479-
boolprepared[MAX_SCRIPTS];/* whether client prepared the script */
479+
/* whether client prepared each command of each script */
480+
bool**prepared;
480481

481482
/* per client collected stats */
482483
int64cnt;/* client transaction count, for -t */
@@ -568,7 +569,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
568569
* argvCommand arguments, the first of which is the command or SQL
569570
*string itself. For SQL commands, after post-processing
570571
*argv[0] is the same as 'lines' with variables substituted.
571-
* varprefix SQL commands terminated with \gset or \aset have this set
572+
* prepnameThe name that this command is prepared under, in prepare mode
573+
* varprefixSQL commands terminated with \gset or \aset have this set
572574
*to a non NULL value. If nonempty, it's used to prefix the
573575
*variable name that receives the value.
574576
* asetdo gset on all possible queries of a combined query (\;).
@@ -583,6 +585,7 @@ typedef struct Command
583585
MetaCommandmeta;
584586
intargc;
585587
char*argv[MAX_ARGS];
588+
char*prepname;
586589
char*varprefix;
587590
PgBenchExpr*expr;
588591
SimpleStatsstats;
@@ -2825,13 +2828,9 @@ runShellCommand(CState *st, char *variable, char **argv, int argc)
28252828
return true;
28262829
}
28272830

2828-
#defineMAX_PREPARE_NAME32
2829-
staticvoid
2830-
preparedStatementName(char*buffer,intfile,intstate)
2831-
{
2832-
sprintf(buffer,"P%d_%d",file,state);
2833-
}
2834-
2831+
/*
2832+
* Report the abortion of the client when processing SQL commands.
2833+
*/
28352834
staticvoid
28362835
commandFailed(CState*st,constchar*cmd,constchar*message)
28372836
{
@@ -2858,6 +2857,87 @@ chooseScript(TState *thread)
28582857
returni-1;
28592858
}
28602859

2860+
/*
2861+
* Prepare the SQL command from st->use_file at command_num.
2862+
*/
2863+
staticvoid
2864+
prepareCommand(CState*st,intcommand_num)
2865+
{
2866+
Command*command=sql_script[st->use_file].commands[command_num];
2867+
2868+
/* No prepare for non-SQL commands */
2869+
if (command->type!=SQL_COMMAND)
2870+
return;
2871+
2872+
/*
2873+
* If not already done, allocate space for 'prepared' flags: one boolean
2874+
* for each command of each script.
2875+
*/
2876+
if (!st->prepared)
2877+
{
2878+
st->prepared=pg_malloc(sizeof(bool*)*num_scripts);
2879+
for (inti=0;i<num_scripts;i++)
2880+
{
2881+
ParsedScript*script=&sql_script[i];
2882+
intnumcmds;
2883+
2884+
for (numcmds=0;script->commands[numcmds]!=NULL;numcmds++)
2885+
;
2886+
st->prepared[i]=pg_malloc0(sizeof(bool)*numcmds);
2887+
}
2888+
}
2889+
2890+
if (!st->prepared[st->use_file][command_num])
2891+
{
2892+
PGresult*res;
2893+
2894+
pg_log_debug("client %d preparing %s",st->id,command->prepname);
2895+
res=PQprepare(st->con,command->prepname,
2896+
command->argv[0],command->argc-1,NULL);
2897+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
2898+
pg_log_error("%s",PQerrorMessage(st->con));
2899+
PQclear(res);
2900+
st->prepared[st->use_file][command_num]= true;
2901+
}
2902+
}
2903+
2904+
/*
2905+
* Prepare all the commands in the script that come after the \startpipeline
2906+
* that's at position st->command, and the first \endpipeline we find.
2907+
*
2908+
* This sets the ->prepared flag for each relevant command as well as the
2909+
* \startpipeline itself, but doesn't move the st->command counter.
2910+
*/
2911+
staticvoid
2912+
prepareCommandsInPipeline(CState*st)
2913+
{
2914+
intj;
2915+
Command**commands=sql_script[st->use_file].commands;
2916+
2917+
Assert(commands[st->command]->type==META_COMMAND&&
2918+
commands[st->command]->meta==META_STARTPIPELINE);
2919+
2920+
/*
2921+
* We set the 'prepared' flag on the \startpipeline itself to flag that we
2922+
* don't need to do this next time without calling prepareCommand(), even
2923+
* though we don't actually prepare this command.
2924+
*/
2925+
if (st->prepared&&
2926+
st->prepared[st->use_file][st->command])
2927+
return;
2928+
2929+
for (j=st->command+1;commands[j]!=NULL;j++)
2930+
{
2931+
if (commands[j]->type==META_COMMAND&&
2932+
commands[j]->meta==META_ENDPIPELINE)
2933+
break;
2934+
2935+
prepareCommand(st,j);
2936+
}
2937+
2938+
st->prepared[st->use_file][st->command]= true;
2939+
}
2940+
28612941
/* Send a SQL command, using the chosen querymode */
28622942
staticbool
28632943
sendCommand(CState*st,Command*command)
@@ -2888,50 +2968,13 @@ sendCommand(CState *st, Command *command)
28882968
}
28892969
elseif (querymode==QUERY_PREPARED)
28902970
{
2891-
charname[MAX_PREPARE_NAME];
28922971
constchar*params[MAX_ARGS];
28932972

2894-
if (!st->prepared[st->use_file])
2895-
{
2896-
intj;
2897-
Command**commands=sql_script[st->use_file].commands;
2898-
2899-
for (j=0;commands[j]!=NULL;j++)
2900-
{
2901-
PGresult*res;
2902-
charname[MAX_PREPARE_NAME];
2903-
2904-
if (commands[j]->type!=SQL_COMMAND)
2905-
continue;
2906-
preparedStatementName(name,st->use_file,j);
2907-
if (PQpipelineStatus(st->con)==PQ_PIPELINE_OFF)
2908-
{
2909-
res=PQprepare(st->con,name,
2910-
commands[j]->argv[0],commands[j]->argc-1,NULL);
2911-
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
2912-
pg_log_error("%s",PQerrorMessage(st->con));
2913-
PQclear(res);
2914-
}
2915-
else
2916-
{
2917-
/*
2918-
* In pipeline mode, we use asynchronous functions. If a
2919-
* server-side error occurs, it will be processed later
2920-
* among the other results.
2921-
*/
2922-
if (!PQsendPrepare(st->con,name,
2923-
commands[j]->argv[0],commands[j]->argc-1,NULL))
2924-
pg_log_error("%s",PQerrorMessage(st->con));
2925-
}
2926-
}
2927-
st->prepared[st->use_file]= true;
2928-
}
2929-
2973+
prepareCommand(st,st->command);
29302974
getQueryParams(st,command,params);
2931-
preparedStatementName(name,st->use_file,st->command);
29322975

2933-
pg_log_debug("client %d sending %s",st->id,name);
2934-
r=PQsendQueryPrepared(st->con,name,command->argc-1,
2976+
pg_log_debug("client %d sending %s",st->id,command->prepname);
2977+
r=PQsendQueryPrepared(st->con,command->prepname,command->argc-1,
29352978
params,NULL,NULL,0);
29362979
}
29372980
else/* unknown sql mode */
@@ -3191,7 +3234,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
31913234
thread->conn_duration+=now-start;
31923235

31933236
/* Reset session-local state */
3194-
memset(st->prepared,0,sizeof(st->prepared));
3237+
pg_free(st->prepared);
3238+
st->prepared=NULL;
31953239
}
31963240

31973241
/* record transaction start time */
@@ -3766,6 +3810,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
37663810
returnCSTATE_ABORTED;
37673811
}
37683812

3813+
/*
3814+
* If we're in prepared-query mode, we need to prepare all the
3815+
* commands that are inside the pipeline before we actually start the
3816+
* pipeline itself. This solves the problem that running BEGIN
3817+
* ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
3818+
* snapshot having been acquired by the prepare within the pipeline.
3819+
*/
3820+
if (querymode==QUERY_PREPARED)
3821+
prepareCommandsInPipeline(st);
3822+
37693823
if (PQpipelineStatus(st->con)!=PQ_PIPELINE_OFF)
37703824
{
37713825
commandFailed(st,"startpipeline","already in pipeline mode");
@@ -4785,6 +4839,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
47854839
my_command->varprefix=NULL;/* allocated later, if needed */
47864840
my_command->expr=NULL;
47874841
initSimpleStats(&my_command->stats);
4842+
my_command->prepname=NULL;/* set later, if needed */
47884843

47894844
returnmy_command;
47904845
}
@@ -4816,6 +4871,7 @@ static void
48164871
postprocess_sql_command(Command*my_command)
48174872
{
48184873
charbuffer[128];
4874+
staticintprepnum=0;
48194875

48204876
Assert(my_command->type==SQL_COMMAND);
48214877

@@ -4824,15 +4880,17 @@ postprocess_sql_command(Command *my_command)
48244880
buffer[strcspn(buffer,"\n\r")]='\0';
48254881
my_command->first_line=pg_strdup(buffer);
48264882

4827-
/*parse query if necessary */
4883+
/*Parse query and generate prepared statement name, if necessary */
48284884
switch (querymode)
48294885
{
48304886
caseQUERY_SIMPLE:
48314887
my_command->argv[0]=my_command->lines.data;
48324888
my_command->argc++;
48334889
break;
4834-
caseQUERY_EXTENDED:
48354890
caseQUERY_PREPARED:
4891+
my_command->prepname=psprintf("P_%d",prepnum++);
4892+
/* fall through */
4893+
caseQUERY_EXTENDED:
48364894
if (!parseQuery(my_command))
48374895
exit(1);
48384896
break;

‎src/bin/pgbench/t/001_pgbench_with_server.pl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,26 @@
828828
}
829829
});
830830

831+
# Working \startpipeline in prepared query mode with serializable
832+
$node->pgbench(
833+
'-c4 -j2 -t 10 -n -M prepared',
834+
0,
835+
[
836+
qr{type: .*/001_pgbench_pipeline_serializable},
837+
qr{actually processed: (\d+)/\1}
838+
],
839+
[],
840+
'working \startpipeline with serializable',
841+
{
842+
'001_pgbench_pipeline_serializable'=>q{
843+
-- test startpipeline with serializable
844+
\startpipeline
845+
BEGIN ISOLATION LEVEL SERIALIZABLE;
846+
} ."select 1;\n"x10 .q{
847+
END;
848+
\endpipeline
849+
}
850+
});
831851

832852
# trigger many expression errors
833853
my@errors = (

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp