@@ -136,6 +136,12 @@ intunlogged_tables = 0;
136136 */
137137double sample_rate = 0.0 ;
138138
139+ /*
140+ * When threads are throttled to a given rate limit, this is the target delay
141+ * to reach that rate in usec. 0 is the default and means no throttling.
142+ */
143+ int64 throttle_delay = 0 ;
144+
139145/*
140146 * tablespace selection
141147 */
@@ -202,11 +208,13 @@ typedef struct
202208int listen ;/* 0 indicates that an async query has been
203209 * sent */
204210int sleeping ;/* 1 indicates that the client is napping */
211+ bool throttling ;/* whether nap is for throttling */
205212int64 until ;/* napping until (usec) */
206213Variable * variables ;/* array of variable definitions */
207214int nvariables ;
208215instr_time txn_begin ;/* used for measuring transaction latencies */
209216instr_time stmt_begin ;/* used for measuring statement latencies */
217+ bool is_throttled ;/* whether transaction throttling is done */
210218int use_file ;/* index in sql_files for this client */
211219bool prepared [MAX_FILES ];
212220}CState ;
@@ -224,6 +232,9 @@ typedef struct
224232instr_time * exec_elapsed ;/* time spent executing cmds (per Command) */
225233int * exec_count ;/* number of cmd executions (per Command) */
226234unsigned short random_state [3 ];/* separate randomness for each thread */
235+ int64 throttle_trigger ;/* previous/next throttling (us) */
236+ int64 throttle_lag ;/* total transaction lag behind throttling */
237+ int64 throttle_lag_max ;/* max transaction lag */
227238}TState ;
228239
229240#define INVALID_THREAD ((pthread_t) 0)
@@ -232,6 +243,8 @@ typedef struct
232243{
233244instr_time conn_time ;
234245int xacts ;
246+ int64 throttle_lag ;
247+ int64 throttle_lag_max ;
235248}TResult ;
236249
237250/*
@@ -356,6 +369,7 @@ usage(void)
356369" -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
357370" -P, --progress=NUM show thread progress report every NUM seconds\n"
358371" -r, --report-latencies report average latency per command\n"
372+ " -R, --rate=SPEC target rate in transactions per second\n"
359373" -s, --scale=NUM report this scale factor in output\n"
360374" -S, --select-only perform SELECT-only transactions\n"
361375" -t, --transactions number of transactions each client runs "
@@ -898,17 +912,62 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
898912{
899913PGresult * res ;
900914Command * * commands ;
915+ bool trans_needs_throttle = false;
901916
902917top :
903918commands = sql_files [st -> use_file ];
904919
920+ /*
921+ * Handle throttling once per transaction by sleeping. It is simpler
922+ * to do this here rather than at the end, because so much complicated
923+ * logic happens below when statements finish.
924+ */
925+ if (throttle_delay && !st -> is_throttled )
926+ {
927+ /*
928+ * Use inverse transform sampling to randomly generate a delay, such
929+ * that the series of delays will approximate a Poisson distribution
930+ * centered on the throttle_delay time.
931+ *
932+ * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier.
933+ *
934+ * If transactions are too slow or a given wait is shorter than
935+ * a transaction, the next transaction will start right away.
936+ */
937+ int64 wait = (int64 )
938+ throttle_delay * - log (getrand (thread ,1 ,1000 )/1000.0 );
939+
940+ thread -> throttle_trigger += wait ;
941+
942+ st -> until = thread -> throttle_trigger ;
943+ st -> sleeping = 1 ;
944+ st -> throttling = true;
945+ st -> is_throttled = true;
946+ if (debug )
947+ fprintf (stderr ,"client %d throttling " INT64_FORMAT " us\n" ,
948+ st -> id ,wait );
949+ }
950+
905951if (st -> sleeping )
906952{/* are we sleeping? */
907953instr_time now ;
954+ int64 now_us ;
908955
909956INSTR_TIME_SET_CURRENT (now );
910- if (st -> until <=INSTR_TIME_GET_MICROSEC (now ))
957+ now_us = INSTR_TIME_GET_MICROSEC (now );
958+ if (st -> until <=now_us )
959+ {
911960st -> sleeping = 0 ;/* Done sleeping, go ahead with next command */
961+ if (st -> throttling )
962+ {
963+ /* Measure lag of throttled transaction relative to target */
964+ int64 lag = now_us - st -> until ;
965+ thread -> throttle_lag += lag ;
966+ if (lag > thread -> throttle_lag_max )
967+ thread -> throttle_lag_max = lag ;
968+ st -> throttling = false;
969+ }
970+ }
912971else
913972return true;/* Still sleeping, nothing to do here */
914973}
@@ -1095,6 +1154,15 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
10951154st -> state = 0 ;
10961155st -> use_file = (int )getrand (thread ,0 ,num_files - 1 );
10971156commands = sql_files [st -> use_file ];
1157+ st -> is_throttled = false;
1158+ /*
1159+ * No transaction is underway anymore, which means there is nothing
1160+ * to listen to right now. When throttling rate limits are active,
1161+ * a sleep will happen next, as the next transaction starts. And
1162+ * then in any case the next SQL command will set listen back to 1.
1163+ */
1164+ st -> listen = 0 ;
1165+ trans_needs_throttle = (throttle_delay > 0 );
10981166}
10991167}
11001168
@@ -1113,6 +1181,16 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
11131181INSTR_TIME_ACCUM_DIFF (* conn_time ,end ,start );
11141182}
11151183
1184+ /*
1185+ * This ensures that a throttling delay is inserted before proceeding
1186+ * with sql commands, after the first transaction. The first transaction
1187+ * throttling is performed when first entering doCustom.
1188+ */
1189+ if (trans_needs_throttle ) {
1190+ trans_needs_throttle = false;
1191+ gototop ;
1192+ }
1193+
11161194/* Record transaction start time if logging is enabled */
11171195if (logfile && st -> state == 0 )
11181196INSTR_TIME_SET_CURRENT (st -> txn_begin );
@@ -2017,7 +2095,8 @@ process_builtin(char *tb)
20172095static void
20182096printResults (int ttype ,int normal_xacts ,int nclients ,
20192097TState * threads ,int nthreads ,
2020- instr_time total_time ,instr_time conn_total_time )
2098+ instr_time total_time ,instr_time conn_total_time ,
2099+ int64 throttle_lag ,int64 throttle_lag_max )
20212100{
20222101double time_include ,
20232102tps_include ,
@@ -2055,6 +2134,19 @@ printResults(int ttype, int normal_xacts, int nclients,
20552134printf ("number of transactions actually processed: %d\n" ,
20562135normal_xacts );
20572136}
2137+
2138+ if (throttle_delay )
2139+ {
2140+ /*
2141+ * Report average transaction lag under rate limit throttling. This
2142+ * is the delay between scheduled and actual start times for the
2143+ * transaction. The measured lag may be caused by thread/client load,
2144+ * the database load, or the Poisson throttling process.
2145+ */
2146+ printf ("average rate limit schedule lag: %.3f ms (max %.3f ms)\n" ,
2147+ 0.001 * throttle_lag /normal_xacts ,0.001 * throttle_lag_max );
2148+ }
2149+
20582150printf ("tps = %f (including connections establishing)\n" ,tps_include );
20592151printf ("tps = %f (excluding connections establishing)\n" ,tps_exclude );
20602152
@@ -2140,6 +2232,7 @@ main(int argc, char **argv)
21402232{"unlogged-tables" ,no_argument ,& unlogged_tables ,1 },
21412233{"sampling-rate" ,required_argument ,NULL ,4 },
21422234{"aggregate-interval" ,required_argument ,NULL ,5 },
2235+ {"rate" ,required_argument ,NULL ,'R' },
21432236{NULL ,0 ,NULL ,0 }
21442237};
21452238
@@ -2162,6 +2255,8 @@ main(int argc, char **argv)
21622255instr_time total_time ;
21632256instr_time conn_total_time ;
21642257int total_xacts ;
2258+ int64 throttle_lag = 0 ;
2259+ int64 throttle_lag_max = 0 ;
21652260
21662261int i ;
21672262
@@ -2206,7 +2301,7 @@ main(int argc, char **argv)
22062301state = (CState * )pg_malloc (sizeof (CState ));
22072302memset (state ,0 ,sizeof (CState ));
22082303
2209- while ((c = getopt_long (argc ,argv ,"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:" ,long_options ,& optindex ))!= -1 )
2304+ while ((c = getopt_long (argc ,argv ,"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R: " ,long_options ,& optindex ))!= -1 )
22102305{
22112306switch (c )
22122307{
@@ -2371,6 +2466,19 @@ main(int argc, char **argv)
23712466exit (1 );
23722467}
23732468break ;
2469+ case 'R' :
2470+ {
2471+ /* get a double from the beginning of option value */
2472+ double throttle_value = atof (optarg );
2473+ if (throttle_value <=0.0 )
2474+ {
2475+ fprintf (stderr ,"invalid rate limit: %s\n" ,optarg );
2476+ exit (1 );
2477+ }
2478+ /* Invert rate limit into a time offset */
2479+ throttle_delay = (int64 ) (1000000.0 /throttle_value );
2480+ }
2481+ break ;
23742482case 0 :
23752483/* This covers long options which take no argument. */
23762484break ;
@@ -2408,6 +2516,9 @@ main(int argc, char **argv)
24082516}
24092517}
24102518
2519+ /* compute a per thread delay */
2520+ throttle_delay *=nthreads ;
2521+
24112522if (argc > optind )
24122523dbName = argv [optind ];
24132524else
@@ -2721,6 +2832,9 @@ main(int argc, char **argv)
27212832TResult * r = (TResult * )ret ;
27222833
27232834total_xacts += r -> xacts ;
2835+ throttle_lag += r -> throttle_lag ;
2836+ if (r -> throttle_lag_max > throttle_lag_max )
2837+ throttle_lag_max = r -> throttle_lag_max ;
27242838INSTR_TIME_ADD (conn_total_time ,r -> conn_time );
27252839free (ret );
27262840}
@@ -2731,7 +2845,7 @@ main(int argc, char **argv)
27312845INSTR_TIME_SET_CURRENT (total_time );
27322846INSTR_TIME_SUBTRACT (total_time ,start_time );
27332847printResults (ttype ,total_xacts ,nclients ,threads ,nthreads ,
2734- total_time ,conn_total_time );
2848+ total_time ,conn_total_time , throttle_lag , throttle_lag_max );
27352849
27362850return 0 ;
27372851}
@@ -2756,6 +2870,17 @@ threadRun(void *arg)
27562870
27572871AggVals aggs ;
27582872
2873+ /*
2874+ * Initialize throttling rate target for all of the thread's clients. It
2875+ * might be a little more accurate to reset thread->start_time here too.
2876+ * The possible drift seems too small relative to typical throttle delay
2877+ * times to worry about it.
2878+ */
2879+ INSTR_TIME_SET_CURRENT (start );
2880+ thread -> throttle_trigger = INSTR_TIME_GET_MICROSEC (start );
2881+ thread -> throttle_lag = 0 ;
2882+ thread -> throttle_lag_max = 0 ;
2883+
27592884result = pg_malloc (sizeof (TResult ));
27602885
27612886INSTR_TIME_SET_ZERO (result -> conn_time );
@@ -2831,25 +2956,38 @@ threadRun(void *arg)
28312956Command * * commands = sql_files [st -> use_file ];
28322957int sock ;
28332958
2834- if (st -> sleeping )
2959+ if (st -> con == NULL )
28352960{
2836- int this_usec ;
2837-
2838- if (min_usec == INT64_MAX )
2961+ continue ;
2962+ }
2963+ else if (st -> sleeping )
2964+ {
2965+ if (st -> throttling && timer_exceeded )
28392966{
2840- instr_time now ;
2841-
2842- INSTR_TIME_SET_CURRENT (now );
2843- now_usec = INSTR_TIME_GET_MICROSEC (now );
2967+ /* interrupt client which has not started a transaction */
2968+ remains -- ;
2969+ st -> sleeping = 0 ;
2970+ st -> throttling = false;
2971+ PQfinish (st -> con );
2972+ st -> con = NULL ;
2973+ continue ;
28442974}
2975+ else /* just a nap from the script */
2976+ {
2977+ int this_usec ;
28452978
2846- this_usec = st -> until - now_usec ;
2847- if (min_usec > this_usec )
2848- min_usec = this_usec ;
2849- }
2850- else if (st -> con == NULL )
2851- {
2852- continue ;
2979+ if (min_usec == INT64_MAX )
2980+ {
2981+ instr_time now ;
2982+
2983+ INSTR_TIME_SET_CURRENT (now );
2984+ now_usec = INSTR_TIME_GET_MICROSEC (now );
2985+ }
2986+
2987+ this_usec = st -> until - now_usec ;
2988+ if (min_usec > this_usec )
2989+ min_usec = this_usec ;
2990+ }
28532991}
28542992else if (commands [st -> state ]-> type == META_COMMAND )
28552993{
@@ -2986,6 +3124,8 @@ threadRun(void *arg)
29863124result -> xacts = 0 ;
29873125for (i = 0 ;i < nstate ;i ++ )
29883126result -> xacts += state [i ].cnt ;
3127+ result -> throttle_lag = thread -> throttle_lag ;
3128+ result -> throttle_lag_max = thread -> throttle_lag_max ;
29893129INSTR_TIME_SET_CURRENT (end );
29903130INSTR_TIME_ACCUM_DIFF (result -> conn_time ,end ,start );
29913131if (logfile )