Why do you need a thread pool?
The thread pool is one of the components of a multithreaded program
For a program, we want to take advantage of the multi-core performance as much as possible
At this time, we need multi-threading or multi-process to squeeze the performance of the CPU.
But multi-process communication is really more expensive, so multi-threading is sometimes a better choice!
We imagine such a scenario, one master and multiple slaves
We have a main thread responsible for event distribution and processing, and take events to the work queue through the queue
The work queue is responsible for processing logic, while the main thread only cares about event distribution
The task queue plays a role in the scheduling of tasks
This is the basic logic of the thread pool
If we open a thread for each task, this can certainly achieve the goal
But we can't afford such overhead, because creating and consuming threads takes time and memory.
And if there are too many threads, it may also cause OOM kill to kill your process
Implementation of thread pool
In order to reduce the mental burden, I directly adopted many things from the C++ standard library, including the following:
- std::vector
- std::queue
- std::function
- std::condition_variable
- std::mutex
- std::thread
- std::future
The thread pool needs a container to store our thread objects for recycling thread pool resources
std::vector<std::thread> worker_;
The thread pool needs a queue to store our execution tasks
using FUNC = std::function<void()>; std::queue<FUNC> que_;
The thread pool needs locks. At the same time, we hope that when the lock is occupied, the thread sleeps and waits. At the same time, we need to recycle thread resources after the execution is completed.
bool stop_; std::mutex mtx_; std::condition_variable cond_;
First of all, the thread is created first in the constructor, which is the idea of a pooled component
Here, the RAII method is used to control the lock
explicit thread_pool(size_t thread_count) : stop_(true) { for (int i = 0; i < thread_count; ++i) { worker_.emplace_back([this]() { while (true) { FUNC task; { std::unique_lock<std::mutex> lock(this->mtx_); this->cond_.wait( lock, [this]() { return this->stop_ || this->que_.empty(); }); if (this->stop_ && this->que_.empty()) return; task = std::move(que_.front()); que_.pop(); } task(); } }); } }
Second, we need to write the part where its destructor releases resources
Recycling resources is relatively simple, just turn on the stop switch and wait for the task queue to finish consumption.
~thread_pool() { { std::lock_guard<std::mutex> lock(mtx_); stop_ = true; } cond_.notify_all(); for (auto &v : worker_) v.join(); }
Finally, the construction and destructor are all written, and there is still an interface for adding tasks.
void add_task(FUNC &&func) { std::lock_guard<std::mutex> lock(mtx_); que_.push(func); cond_.notify_one(); }
So far, our thread pool is simply finished. However, this thread pool is a very simple thread pool with a simple interface.
Because we can only pass it a void() function and can't pass parameters, it's very unfriendly
Although when we call, we can use lambda to construct the parameters we pass in.
However, we should seek a more natural way to pass parameters
Next, let's take a look at how to improve our thread pool
Thread pool improvements
As mentioned above, the improvement point is obviously in our add_task
How to support multiple parameters?
Function overloading is indispensable, so I am stupid to write one by one. Of course, we need to use variable-length parameters!
technology needed
- We need to accept function objects and parameters variable-length parameters to achieve
- We need to deduce the return value of the function decltype implementation
- We need to get the return value asynchronously get the return value future implementation
With just a little bit of perfect forwarding and template programming, we can easily achieve our needs for matching arbitrary functions
template <typename F, typename... Arg> auto submit(F &&func, Arg &&...arg) -> std::future<decltype(func(arg...))> { using ret = decltype(func(arg...)); auto fun = [fn = std::forward<F>(func), ... pack = std::forward<Arg>(arg)]() { return fn(pack...); }; // c++ 14 can capture move auto task = std::make_shared<std::packaged_task<ret()>>(fun); add_task([task]() { (*task)(); }); return task->get_future(); }
full code
class thread_pool { public: using FUNC = std::function<void()>; explicit thread_pool(size_t thread_count) : stop_(true) { for (int i = 0; i < thread_count; ++i) { worker_.emplace_back([this]() { while (true) { FUNC task; { std::unique_lock<std::mutex> lock(this->mtx_); this->cond_.wait( lock, [this]() { return this->stop_ || this->que_.empty(); }); if (this->stop_ && this->que_.empty()) return; task = std::move(que_.front()); que_.pop(); } task(); } }); } } template <typename F, typename... Arg> auto submit(F &&func, Arg &&...arg) -> std::future<decltype(func(arg...))> { using ret = decltype(func(arg...)); auto fun = [fn = std::forward<F>(func), ... pack = std::forward<Arg>(arg)]() { return fn(pack...); }; // c++ 14 can capture move auto task = std::make_shared<std::packaged_task<ret()>>(fun); add_task([task]() { (*task)(); }); return task->get_future(); } ~thread_pool() { { std::lock_guard<std::mutex> lock(mtx_); stop_ = true; } cond_.notify_all(); for (auto &v : worker_) v.join(); } private: void add_task(FUNC &&func) { std::lock_guard<std::mutex> lock(mtx_); que_.push(func); cond_.notify_one(); } bool stop_; std::mutex mtx_; std::condition_variable cond_; std::queue<FUNC> que_; std::vector<std::thread> worker_; };
Summarize
In fact, in essence, what the thread pool does is a producer-consumer model
That's right, it's the one you see in the operating system book, the most basic, most realistic model
The essence of our thing is nothing but a multi-producer, multi-consumer model
And what about its use?
In fact, there are still more, especially in the case of high throughput, the processing logic should fill the cpu as much as possible. At this time, the thread pool component is useful.
Tips: The size of the allocated thread pool is related to the number of cpu cores, and the specific allocation is adjusted according to the actual situation
[amjieker]