Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaeb57af

Browse files
committed
pgbench: Synchronize client threads.
Wait until all pgbench threads are connected before benchmarking begins.This fixes a problem where some connections could take a very long timeto be established because of lock contention from earlier connections,making results unstable and bogus with high connection counts.Author: Andres Freund <andres@anarazel.de>Author: Fabien COELHO <coelho@cri.ensmp.fr>Reviewed-by: Marina Polyakova <m.polyakova@postgrespro.ru>Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>Reviewed-by: David Rowley <dgrowleyml@gmail.com>Discussion:https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
1 parent44bf3d5 commitaeb57af

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

‎src/bin/pgbench/pgbench.c

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,21 +126,37 @@ typedef struct socket_set
126126
#defineTHREAD_JOIN(handle) \
127127
(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
128128
GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
129+
#defineTHREAD_BARRIER_T SYNCHRONIZATION_BARRIER
130+
#defineTHREAD_BARRIER_INIT(barrier,n) \
131+
(InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
132+
#defineTHREAD_BARRIER_WAIT(barrier) \
133+
EnterSynchronizationBarrier((barrier), \
134+
SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
135+
#defineTHREAD_BARRIER_DESTROY(barrier)
129136
#elif defined(ENABLE_THREAD_SAFETY)
130137
/* Use POSIX threads */
131-
#include<pthread.h>
138+
#include"port/pg_pthread.h"
132139
#defineTHREAD_T pthread_t
133140
#defineTHREAD_FUNC_RETURN_TYPE void *
134141
#defineTHREAD_FUNC_RETURN return NULL
135142
#defineTHREAD_CREATE(handle,function,arg) \
136143
pthread_create((handle), NULL, (function), (arg))
137144
#defineTHREAD_JOIN(handle) \
138145
pthread_join((handle), NULL)
146+
#defineTHREAD_BARRIER_T pthread_barrier_t
147+
#defineTHREAD_BARRIER_INIT(barrier,n) \
148+
pthread_barrier_init((barrier), NULL, (n))
149+
#defineTHREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
150+
#defineTHREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
139151
#else
140152
/* No threads implementation, use none (-j 1) */
141153
#defineTHREAD_T void *
142154
#defineTHREAD_FUNC_RETURN_TYPE void *
143155
#defineTHREAD_FUNC_RETURN return NULL
156+
#defineTHREAD_BARRIER_T int
157+
#defineTHREAD_BARRIER_INIT(barrier,n) (*(barrier) = 0)
158+
#defineTHREAD_BARRIER_WAIT(barrier)
159+
#defineTHREAD_BARRIER_DESTROY(barrier)
144160
#endif
145161

146162

@@ -326,6 +342,9 @@ typedef struct RandomState
326342
/* Various random sequences are initialized from this one. */
327343
staticRandomStatebase_random_sequence;
328344

345+
/* Synchronization barrier for start and connection */
346+
staticTHREAD_BARRIER_Tbarrier;
347+
329348
/*
330349
* Connection state machine states.
331350
*/
@@ -6121,6 +6140,10 @@ main(int argc, char **argv)
61216140
if (duration>0)
61226141
setalarm(duration);
61236142

6143+
errno=THREAD_BARRIER_INIT(&barrier,nthreads);
6144+
if (errno!=0)
6145+
pg_log_fatal("could not initialize barrier: %m");
6146+
61246147
#ifdefENABLE_THREAD_SAFETY
61256148
/* start all threads but thread 0 which is executed directly later */
61266149
for (i=1;i<nthreads;i++)
@@ -6191,6 +6214,8 @@ main(int argc, char **argv)
61916214
printResults(&stats,pg_time_now()-bench_start,conn_total_duration,
61926215
bench_start-start_time,latency_late);
61936216

6217+
THREAD_BARRIER_DESTROY(&barrier);
6218+
61946219
if (exit_code!=0)
61956220
pg_log_fatal("Run was aborted; the above results are incomplete.");
61966221

@@ -6237,6 +6262,8 @@ threadRun(void *arg)
62376262
state[i].state=CSTATE_CHOOSE_SCRIPT;
62386263

62396264
/* READY */
6265+
THREAD_BARRIER_WAIT(&barrier);
6266+
62406267
thread_start=pg_time_now();
62416268
thread->started_time=thread_start;
62426269
last_report=thread_start;
@@ -6249,7 +6276,18 @@ threadRun(void *arg)
62496276
for (inti=0;i<nstate;i++)
62506277
{
62516278
if ((state[i].con=doConnect())==NULL)
6279+
{
6280+
/*
6281+
* On connection failure, we meet the barrier here in place of
6282+
* GO before proceeding to the "done" path which will cleanup,
6283+
* so as to avoid locking the process.
6284+
*
6285+
* It is unclear whether it is worth doing anything rather than
6286+
* coldly exiting with an error message.
6287+
*/
6288+
THREAD_BARRIER_WAIT(&barrier);
62526289
gotodone;
6290+
}
62536291
}
62546292

62556293
/* compute connection delay */
@@ -6261,6 +6299,8 @@ threadRun(void *arg)
62616299
thread->conn_duration=0;
62626300
}
62636301

6302+
/* GO */
6303+
THREAD_BARRIER_WAIT(&barrier);
62646304

62656305
start=pg_time_now();
62666306
thread->bench_start=start;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp