Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Thread pool implementation using c++11 threads

License

NotificationsYou must be signed in to change notification settings

mtrebi/thread-pool

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

 Introduction
 Build instructions
 Thread pool
        Queue
        Submit function
        Thread worker
 Usage example
        Use case#1
        Use case#2
        Use case#3
 Future work
 References

Introduction:

Athread pool is a technique that allows developers to exploit the concurrency of modern processors in aneasy andefficient manner. It's easy because you send "work" to the pool and somehow this work gets done without blocking the main thread. It's efficient because threads are not initialized each time we want the work to be done. Threads are initialized once and remain inactive until some work has to be done. This way we minimize the overhead.

There are many more Thread pool implementations in C++, many of them are probably better (safer, faster...) than mine. However,I believe my implementation arevery straightforward and easy to understand.

Disclaimer: Please Do not use this project in a professional environment. It may contain bugs and/or not work as expected. I did this project to learn how C++11 Threads work and provide an easy way for other people to understand it too.

Build instructions:

This project has been developed using Netbeans and Linux but it should work on Windows, MAC OS and Linux. It can be easily build using CMake and different other generators. The following code can be used to generate the VS 2017 project files:

// VS 2017cd<project-folder>mkdirbuildcdbuild/cmake .."Visual Studio 15 2017 Win64"

Then, from VS you can edit and execute the project. Make sure thatmain project is set up as the startup project

If you are using Linux, you need to change the generator (use the default) and execute an extra operation to actually make it executable:

// Linuxcd<project-folder>mkdirbuildcdbuild/cmake ..make

Thread pool

The way that I understand things better is with images. So, let's take a look at the image of thread pool given by Wikipedia:

As you can see, we have three important elements here:

  • Tasks Queue. This is where the work that has to be done is stored.
  • Thread Pool. This is set of threads (or workers) that continuously take work from the queue and do it.
  • Completed Tasks. When the Thread has finished the work we return "something" to notify that the work has finished.

Queue

We use a queue to store the work because it's the more sensible data structure. We want the work to bestarted in the same order that we sent it. However, this queue is a little bitspecial. As I said in the previous section, threads are continuously (well, not really, but let's assume that they are) querying the queue to ask for work. When there's work available, threads take the work from the queue and do it. What would happen if two threads try to take the same work at the same time? Well, the program would crash.

To avoid these kinds of problems, I implemented a wrapper over the standard C++ Queue that uses mutex to restrict the concurrent access. Let's see a small sample of the SafeQueue class:

voidenqueue(T&t) {std::unique_lock<std::mutex>lock(m_mutex);m_queue.push(t);}

To enqueue the first thing we do is lock the mutex to make sure that no one else is accessing the resource. Then, we push the element to the queue. When the lock goes out of scopes it gets automatically released. Easy, huh? This way, we make the Queue thread-safe and thus we don't have to worry many threads accessing and/or modifying it at the same "time".

Submit function

The most important method of the thread pool is the one responsible of adding work to the queue. I called this methodsubmit. It's not difficult to understand how it works but its implementation can seem scary at first. Let's think aboutwhat should do and after that we will worry abouthow to do it. What:

  • Accept any function with any parameters.
  • Return "something" immediately to avoid blocking main thread. This returned object shouldeventually contain the result of the operation.

Cool, let's seehow we can implement it.

Submit implementation

The complete submit functions looks like this:

// Submit a function to be executed asynchronously by the pooltemplate<typenameF,typename...Args>autosubmit(F&&f,Args&&...args)->std::future<decltype(f(args...))> {// Create a function with bounded parameters ready to executestd::function<decltype(f(args...))()>func=std::bind(std::forward<F>(f),std::forward<Args>(args)...);// Encapsulate it into a shared ptr in order to be able to copy construct / assignautotask_ptr=std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);// Wrap packaged task into void functionstd::function<void()>wrapper_func= [task_ptr]() {  (*task_ptr)(); };// Enqueue generic wrapper functionm_queue.enqueue(wrapperfunc);// Wake up one thread if its waitingm_conditional_lock.notify_one();// Return future from promisereturntask_ptr->get_future();}

Nevertheless, we're going to inspect line by line what's going on in order to fully understand how it works.

Variadic template function

template<typenameF,typename...Args>

This means that the next statement is templated. The first template parameter is called F (our function) and second one is a parameter pack. A parameter pack is a special template parameter that can accept zero or more template arguments. It is, in fact, a way to express a variable number of arguments in a template. A template with at least one parameter pack is calledvariadic template

Summarizing, we are telling the compiler that our submit function is going to take one generic parameter of type F (our function) and a parameter pack Args (the parameters of the function F).

Function declaration

autosubmit(F&&f,Args&&...args)->std::future<decltype(f(args...))> {

This may seem weird but, it's not. A function, in fact, can be declared using two different syntaxes. The following is the most well known:

return-typeidentifier (argument-declarations... )

But, we can also declare the function like this:

autoidentifier (argument-declarations... )->return_type

Why two syntaxes? Well, imagine that you have a function that has a return type that depends on the input parameters of the function. Using the first syntax you can't declare that function without getting a compiler error since you would be using a variable in the return type that has not been declared yet (because the return type declaration goes before the parameters type declaration).

Using the second syntax you can declare the function to have return typeauto then, using the -> you can declare the return type depending on the arguments of the functions that have been declared previously.

Now, let's inspect the parameters of the submit function. When the type of a parameter is declared asT&& for some deducted type T that parameter is auniversal reference. This term was coined byScott Meyers becauseT&& can also mean r-value reference. However, in the context of type deduction, it means that it can be bound to both l-values and r-values, unlike l-value references that can only be bound to non-const objects (they bind only to modifiable lvalues) and r-value references (they bind only to rvalues).

The return type of the function is of typestd::future. An std::future is a special type that provides a mechanism to access the result of asynchronous operations, in our case, the result of executing a specific function. This makes sense with what we said earlier.

Finally, the template type of std::future isdecltype(f(args...)). Decltype is a special C++ keyword that inspects the declared type of an entity or the type and value category of an expression. In our case, we want to know the return type of the functionf, so we give decltype our generic functionf and the parameter packargs.

Function body

// Create a function with bounded parameters ready to executestd::function<decltype(f(args...))()>func=std::bind(std::forward<F>(f),std::forward<Args>(args)...);

There are many many things happening here. First of all, thestd::bind(F, Args) is a function that creates a wrapper for F with the given Args. Caling this wrapper is the same as calling F with the Args that it has been bound. Here, we are simply calling bind with our generic functionf and the parameter packargs but using another wrapperstd::forward(t) for each parameter. This second wrapper is needed to achieve perfect forwarding of universal references.The result of this bind call is astd::function. The std::function is a C++ object that encapsulates a function. It allows you to execute the function as if it were a normal function calling the operator() with the required parameters BUT, because it is an object, you can store it, copy it and move it around. The template type of any std::function is the signature of that function: std::function< return-type (arguments)>. In this case, we already know how to get the return type of this function using decltype. But, what about the arguments? Well, because we bound all argumentsargs to the functionf we just have to add an empty pair of parenthesis that represents an empty list of arguments:decltype(f(args...))().

// Encapsulate it into a shared ptr in order to be able to copy construct / assignautotask_ptr=std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

The next thing we do is we create astd::packaged_task(t). A packaged_task is a wrapper around a function that can be executed asynchronously. It's result is stored in a shared state inside an std::future object. The templated type T of an std::packaged_task(t) is the type of the functiont that is wrapping. Because we said it before, the signature of the functionf isdecltype(f(args...))() that is the same type of the packaged_task. Then, we just wrap again this packaged task inside astd::shared_ptr using the initialize functionstd::make_shared.

// Wrap packaged task into void functionstd::function<void()>wrapperfunc= [task_ptr]() {  (*task_ptr)(); };

Again, we create a std:.function, but, note that this time its template type isvoid(). Independently of the functionf and its parametersargs thiswrapperfunc the return type will always bevoid. Since all functionsf may have different return types, the only way to store them in a container (our Queue) is wrapping them with a generic void function. Here, we are just declaring thiswrapperfunc to execute the actual tasktaskptr that will execute the bound functionfunc.

// Enqueue generic wrapper functionm_queue.enqueue(wrapperfunc);

We enqueue thiswrapperfunc.

// Wake up one thread if its waitingm_conditional_lock.notify_one();

Before finishing, we wake up one thread in case it is waiting.

// Return future from promisereturntask_ptr->get_future();

And finally, we return the future of the packaged_task. Because we are returning the future that is bound to the packaged_tasktaskptr that, at the same time, is bound with the functionfunc, executing thistaskptr will automatically update the future. Because we wrapped the execution of thetaskptr with a generic wrapper function, is the execution ofwrapperfunc that, in fact, updates the future. Aaaaand. since we enqueued this wrapper function, it will be executed by a thread after being dequeued calling the operator().

Thread worker

Now that we understand how the submit method works, we're going to focus on how the work gets done. Probably, the simplest implementation of a thread worker could be using polling:

 LoopIf Queue is not emptyDequeue workDo it

This looks alright but it'snot very efficient. Do you see why? What would happen if there is no work in the Queue? The threads would keep looping and asking all the time: Is the queue empty?

The more sensible implementation is done by "sleeping" the threads until some work is added to the queue. As we saw before, as soon as we enqueue work, a signalnotify_one() is sent. This allows us to implement a more efficient algorithm:

LoopIf Queue is emptyWait signalDequeue workDo it

This signal system is implemented in C++ withconditional variables. Conditional variables are always bound to a mutex, so I added a mutex to the thread pool class just to manage this. The final code of a worker looks like this:

voidoperator()() {std::function<void()>func;booldequeued;while (!m_pool->m_shutdown) {{std::unique_lock<std::mutex>lock(m_pool->m_conditional_mutex);if (m_pool->m_queue.empty()) {m_pool->m_conditional_lock.wait(lock);}dequeued=m_pool->m_queue.dequeue(func);}if (dequeued) {func();}}}

The code is really easy to understand so I am not going to explain anything. The only thing to note here is that,func is our wrapper function declared as:

std::function<void()>wrapperfunc= [task_ptr]() {  (*task_ptr)(); };

So, executing this function will automatically update the future.

Usage example

Creating the thread pool is as easy as:

// Create pool with 3 threadsThreadPoolpool(3);// Initialize poolpool.init();

When we want to shutdown the pool just call:

// Shutdown the pool, releasing all threadspool.shutdown()

Ff we want to send some work to the pool, after we have initialized it, we just have to call the submit function:

pool.submit(work);

Depending on the type of work, I've distinguished different use-cases. Suppose that the work that we have to do is multiply two numbers. We can do it in many different ways. I've implemented the three most common ways to do it that I can imagine:

  • Use-Case #1. Function returns the result
  • Use-Case #2. Function updates by ref parameter with the result
  • Use-Case #3. Function prints the result

Note: This is just to show how the submit function works. Options are not exclusive

Use-Case #1

The multiply function with a return looks like this:

// Simple function that adds multiplies two numbers and returns the resultintmultiply(constinta,constintb) {constintres=a*b;returnres;}

Then, the submit:

// The type of future is given by the return type of the functionstd::future<int>future=pool.submit(multiply,2,3);

We can also use theauto keyword for convenience:

autofuture=pool.submit(multiply,2,3);

Nice, when the work is finished by the thread pool we know that the future will get updated and we can retrieve the result calling:

constintresult=future.get();std::cout <<result <<std::endl;

The get() function of std::future always return the type T of the future.This type will always be equal to the return type of the function passed to the submit method. In this case, int.

Use-Case #2

The multiply function has a parameter passed by ref:

// Simple function that adds multiplies two numbers and updates the out_res variable passed by refvoidmultiply(int&out_res,constinta,constintb) {out_res=a*b;}

Now, we have to call the submit function with a subtle difference. Because we are using templates and type deduction (universal references), the parameter passed by ref needs to be called usingstd::ref(param) to make sure that we are passing it by ref and not by value.

intresult=0;autofuture=pool.submit(multiply,std::ref(result),2,3);// result is 0future.get();// result is 6std::cout <<result <<std::endl;

In this case, what's the type of future? Well, as I said before, the return type will always be equal to the return type of the function passed to the submit method. Because this function is of type void, the future isstd::future. Calling future.get() returns void. That's not very useful, but we still need to call .get() to make sure that the work has been done.

Use-Case #3

The last case is the easiest one. Our multiply function simply prints the result:

We have a simple function without output parameters. For this example I implemented the following multiplication function:

// Simple function that adds multiplies two numbers and prints the resultvoidmultiply(constinta,constintb) {constintresult=a*b;std::cout <<result <<std::endl;}

Then, we can simply call:

autofuture=pool.submit(multiply,2,3);future.get();

In this case, we know that as soon as the multiplication is done it will be printed. If we care when this is done, we can wait for it calling future.get().

Checkout themain program for a complete example.

Future work

  • Make it more reliable and safer (exceptions)
  • Find a better way to use it with member functions (thanks to @rajenk)
  • Run benchmarks and improve performance if needed
  • Evaluate performance and impact of std::function in the heap and try alternatives if necessary. (thanks to @JensMunkHansen)

References

About

Thread pool implementation using c++11 threads

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp