Thread pool based on modern C++ implementation

Why do you need a thread pool?

The thread pool is one of the components of a multithreaded program

For a program, we want to take advantage of the multi-core performance as much as possible

At this time, we need multi-threading or multi-process to squeeze the performance of the CPU.

But multi-process communication is really more expensive, so multi-threading is sometimes a better choice!

We imagine such a scenario, one master and multiple slaves

We have a main thread responsible for event distribution and processing, and take events to the work queue through the queue

The work queue is responsible for processing logic, while the main thread only cares about event distribution

The task queue plays a role in the scheduling of tasks

This is the basic logic of the thread pool

If we open a thread for each task, this can certainly achieve the goal

But we can't afford such overhead, because creating and consuming threads takes time and memory.

And if there are too many threads, it may also cause OOM kill to kill your process

Implementation of thread pool

In order to reduce the mental burden, I directly adopted many things from the C++ standard library, including the following:

  • std::vector
  • std::queue
  • std::function
  • std::condition_variable
  • std::mutex
  • std::thread
  • std::future

The thread pool needs a container to store our thread objects for recycling thread pool resources

std::vector<std::thread> worker_;

The thread pool needs a queue to store our execution tasks

using FUNC = std::function<void()>;
std::queue<FUNC> que_;

The thread pool needs locks. At the same time, we hope that when the lock is occupied, the thread sleeps and waits. At the same time, we need to recycle thread resources after the execution is completed.

bool stop_;
std::mutex mtx_;
std::condition_variable cond_;

First of all, the thread is created first in the constructor, which is the idea of ​​​​a pooled component

Here, the RAII method is used to control the lock

explicit thread_pool(size_t thread_count) : stop_(true) {
  for (int i = 0; i < thread_count; ++i) {
    worker_.emplace_back([this]() {
      while (true) {
        FUNC task;
        {
          std::unique_lock<std::mutex> lock(this->mtx_);
          this->cond_.wait(
              lock, [this]() { return this->stop_ || this->que_.empty(); });
          if (this->stop_ && this->que_.empty())
            return;
          task = std::move(que_.front());
          que_.pop();
        }
        task();
      }
    });
  }
}

Second, we need to write the part where its destructor releases resources

Recycling resources is relatively simple, just turn on the stop switch and wait for the task queue to finish consumption.

~thread_pool() {
  {
    std::lock_guard<std::mutex> lock(mtx_);
    stop_ = true;
  }
  cond_.notify_all();
  for (auto &v : worker_)
    v.join();
}

Finally, the construction and destructor are all written, and there is still an interface for adding tasks.

void add_task(FUNC &&func) {
  std::lock_guard<std::mutex> lock(mtx_);
  que_.push(func);
  cond_.notify_one();
}

So far, our thread pool is simply finished. However, this thread pool is a very simple thread pool with a simple interface.

Because we can only pass it a void() function and can't pass parameters, it's very unfriendly

Although when we call, we can use lambda to construct the parameters we pass in.

However, we should seek a more natural way to pass parameters

Next, let's take a look at how to improve our thread pool

Thread pool improvements

As mentioned above, the improvement point is obviously in our add_task

How to support multiple parameters?

Function overloading is indispensable, so I am stupid to write one by one. Of course, we need to use variable-length parameters!

technology needed

  • We need to accept function objects and parameters variable-length parameters to achieve
  • We need to deduce the return value of the function decltype implementation
  • We need to get the return value asynchronously get the return value future implementation

With just a little bit of perfect forwarding and template programming, we can easily achieve our needs for matching arbitrary functions

template <typename F, typename... Arg>
auto submit(F &&func, Arg &&...arg) -> std::future<decltype(func(arg...))> {
  using ret = decltype(func(arg...));
  auto fun = [fn = std::forward<F>(func),
              ... pack = std::forward<Arg>(arg)]() {
    return fn(pack...);
  }; // c++ 14 can capture move
  auto task = std::make_shared<std::packaged_task<ret()>>(fun);
  add_task([task]() { (*task)(); });
  return task->get_future();
}

full code

class thread_pool {
public:
  using FUNC = std::function<void()>;
  explicit thread_pool(size_t thread_count) : stop_(true) {
    for (int i = 0; i < thread_count; ++i) {
      worker_.emplace_back([this]() {
        while (true) {
          FUNC task;
          {
            std::unique_lock<std::mutex> lock(this->mtx_);
            this->cond_.wait(
                lock, [this]() { return this->stop_ || this->que_.empty(); });
            if (this->stop_ && this->que_.empty())
              return;
            task = std::move(que_.front());
            que_.pop();
          }
          task();
        }
      });
    }
  }
  template <typename F, typename... Arg>
  auto submit(F &&func, Arg &&...arg) -> std::future<decltype(func(arg...))> {
    using ret = decltype(func(arg...));
    auto fun = [fn = std::forward<F>(func),
                ... pack = std::forward<Arg>(arg)]() {
      return fn(pack...);
    }; // c++ 14 can capture move
    auto task = std::make_shared<std::packaged_task<ret()>>(fun);
    add_task([task]() { (*task)(); });
    return task->get_future();
  }

  ~thread_pool() {
    {
      std::lock_guard<std::mutex> lock(mtx_);
      stop_ = true;
    }
    cond_.notify_all();
    for (auto &v : worker_)
      v.join();
  }

private:
  void add_task(FUNC &&func) {
    std::lock_guard<std::mutex> lock(mtx_);
    que_.push(func);
    cond_.notify_one();
  }
  
  bool stop_;
  std::mutex mtx_;
  std::condition_variable cond_;
  std::queue<FUNC> que_;
  std::vector<std::thread> worker_;
};

Summarize

In fact, in essence, what the thread pool does is a producer-consumer model

That's right, it's the one you see in the operating system book, the most basic, most realistic model

The essence of our thing is nothing but a multi-producer, multi-consumer model

And what about its use?

In fact, there are still more, especially in the case of high throughput, the processing logic should fill the cpu as much as possible. At this time, the thread pool component is useful.

Tips: The size of the allocated thread pool is related to the number of cpu cores, and the specific allocation is adjusted according to the actual situation

[amjieker]

Tags: C++

Posted by IceRegent on Tue, 11 Oct 2022 05:58:01 +0300