Prerequistic Knowledge
I highly recommend to have some knowledge of c++, threads and programming in general, otherwise it would be tedious for you to research and understand all of this.
Thread Pools
What are Thread Pools?
They are a group of threads that are initially created, while waiting for a job or task to execute.
The idea is to have threads that are always executing without being destroyed, hence avoiding the hassle to create or destroy threads when necessary.
To do this in C++ we'll have to look at some of the features provided by the language. I am providing a small explaination of what the concepts do below, but i recommend you to have a look at them by yourself through the documentation.
std::thread,std::mutex andstd::condition_variable
Threads allow multiple functions to execute concurrently.
std::thread
represents a single thread of execution.If you have ever used threads in your program, you must've heard of thread safety and mutexes. The
std::mutex
is a synchronization primitive that can be used to protect shared data from being simutaneously accessed by multiple threads.The purpose of this class is to wait for some condition to become
true
. Thestd::condition_variable
is something used with mutexes - To blockone or more threads, until another thread does two things- Modify a shared variable
- Notifies the condition_variable.
std::forward, perfect forwarding andstd::bind
std::forward
is used to implement perfect forwarding. Perfect forwarding is the process of forwarding arguments in such a way that contains it'soriginal value type - whether it's an rvalue or an lvalue.std::bind
class is used for partial function application, meaning if the arguments are pre-specified, it generates a forwarding call wrapper for the function say 'f'. Callingthis wrapper is equivalent to calling the original function 'f' with some arguments already bound to list of arguments to bind.
std::future andstd::packaged_task
std::future
provides a method to access theresults of asynchronous operations. They are associated with a shared state and can be constructed by usingstd::packaged_task
.std::packaged_task
wraps any callable target so it can be called asynchronously.
Program Logic
We will be creating apackaged_task
in our main thread, which usesget_future
to obtain std::future. As they use shared resource, we can move thepackaged_task
into theother threads and start the job or task as it wraps a callable function.
Using thecondition_variable
we can put the thread tono task running mode or wake them up if a task piles up.
Before we get into coding, let's look at things we're gonna need
- a vector of threads
- mutex and condition_variable
- a queue of the tasks or functions to be executed
- a bool variable so we can destroy threads
- a worker function that acts as our task
We will be using anenqueue
function where our actual logic for threadpool will be implemented.
Let's look at our code until yet:
classThreadPool{private:std::vector<std::thread>workers;std::mutexmutex;std::condition_variablecv;std::queue<std::function<void()>>queue;voidworker();boolstop;public:ThreadPool(std::size_tnr_threads=std::thread::hardware_concurrency());~ThreadPool();template<typenameF,typename...Args>autoenqueue(F&&f,Args&&...args)->std::future<decltype(f(args...))>;ThreadPool(ThreadPool&)=delete;ThreadPool(constThreadPool&)=delete;ThreadPool&operator=(ThreadPool&&)=delete;ThreadPool&operator=(constThreadPool&)=delete;};
std::thread::hardware_concurrency
returns the amount of concurrent threads supported by the implementation.
Now, let's initialize our constructor first, we would need to simply fill the vector with aworker
function:
ThreadPool::ThreadPool(std::size_tnr_workers){stop=false;for(autoi{0};i<nr_workers;i++){workers.emplace_back(&ThreadPool::worker,this);}}
We useemplace_back
as instead of taking a value_type, it takes a variadic list of arguments, so that means that you can now perfectly forward the arguments and construct directly an object into a container.
Thread Worker
TheThreadPool::worker
function is our ThreadWorker, and it's necessary to process what's inside the queue, let's see how we will implement it:
- First we will create a lock to acquire mutex.
- We would want to loop until shutdown/stop is requested.
- The thread stops until it's woken up again and the condition_variable will be checked if it's true, which will check if the queue is empty or stop = true.
- We will grab the function from queue and execute it.
We will use unique lock with a scope until we grab the function.
voidThreadPool::worker(){for(;;){std::function<void()>cur_task;{std::unique_lock<std::mutex>lock(mutex);cv.wait(lock,[this](){returnstop||!queue.empty();});if(stop&&queue.empty())break;if(queue.empty())continue;cur_task=queue.front();queue.pop();// grab the fx from queue}cur_task();}}
ThreadPool::enqueue
It is the "add task" function of our ThreadPool, returning a future through which we can get the result of the task, let's see what steps we need to build the function:
- Templatize function and variadic arguments.
- Bind the function itself and it's arguments using
std::bind
. - Since the function we are gonna push inside the queue needs to be copyable, as
std::function
target must be a copy constructor, hence we will wrap it inside a shared pointer. - We will acquire a lock, since the queuemight be accessed by other, put the encapsulated or wrapped function inside the queue, hence executing it.
- Use
cv.notify_one()
to notify one thread and process the task. - Return the future object.
template<typenameF,typename...Args>inlineautoThreadPool::enqueue(F&&f,Args&&...args)->std::future<decltype(f(args...))>{autofunc=std::bind(std::forward<F>(f),std::forward<Args>(args)...);autoencapsulated_ptr=std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);std::future<std::result_of_t<F(Args...)>>future_object=encapsulated_ptr->get_future();{std::unique_lock<std::mutex>lock(mutex);queue.emplace([encapsulated_ptr](){(*encapsulated_ptr)();// execute the fx});}cv.notify_one();returnfuture_object;}
We cannot deduce the return type at compile time hence we use the auto keyword and provide the return hint usingdecltype
.
Okay now 90% of our task is done, let us create the destructor of the class which will notify all using the condition variable and join all the threads.
ThreadPool::~ThreadPool(){{std::unique_lock<std::mutex>lock(mutex);stop=true;}cv.notify_all();for(auto&worker:workers){worker.join();}}
Testing our ThreadPool
intmain(){ThreadPoolpool(4);std::vector<std::future<int>>results;for(inti=0;i<8;++i){autofuture=pool.enqueue([i]{returni+i;});results.emplace_back(std::move(future));}for(auto&result:results)std::cout<<result.get()<<' ';std::cout<<std::endl;}
We create a pool of 4 threads, a vector containing the future of the threads and hence using theenqueue
to execute the function.
This gives us:
0 2 4 6 8 10 12 14
If you reached till the end, thankyou for reading.
Top comments(4)
For further actions, you may consider blocking this person and/orreporting abuse