C++简单线程池

本文最后更新于:2023年12月17日 下午

前言

之前对C++多线程这块一直不太熟悉,而恰好这段时间看到一段简单线程池实现的代码,所以想着不如趁此机会学习一下。本文会先介绍下线程池实现里用到的相关同步原语,如互斥锁和条件变量,再来简单分析其实现。

互斥锁

正如《Linux多线程服务端编程》一书中指出,“互斥器(mutex)恐怕是用得最多的同步原语,粗略地说,它保护了临界区,任何一个时刻最多只能由一个线程在此mutex划出的临界区内活动。单独使用mutex时,我们主要为了保护共享数据”。书中也提到了使用mutex时建议的四个原则:

  • 用RAII手法封装mutex的创建、销毁、加锁、解锁这四个操作。
  • 只用非递归的mutex(即不可重入的mutex)。
  • 不手工调用lock() 和 unlock() 函数,一切交给栈上的Guard对象的构造和析构函数负责。
  • 在每次构造Guard对象的时候,思考一路上(调用栈上)已经持有的锁,防止因加锁顺序不同而导致死锁。

这里仅对上述的第三点展开介绍实际用法,通常 std::mutex 都会结合 std::lock_guard、std::scoped_lock、std::unique_lock 来使用。

其中 std::lock_guard 的用法如下所示,基本上就是在构造的时候传入一个 mutex,在它的构造函数中会执行 mutex.lock() ,而在 lock_guard 析构函数中会执行 mutex.unlock() 从而实现了自动的加锁和解锁操作。

1
2
3
4
5
6
7
std::mutex g_mutex;

{
std::lock_guard<std::mutex> lock(g_mutex);
// 以下都是临界区
...
}

std::scoped_lockstd::lock_guard 用法类似,主要区别在于 std::scoped_lock 支持同时为多个 mutex 进行上锁和解锁,基本用法如下所示。

1
2
3
4
5
6
std::mutex m1, m2;

{
std::scoped_lock(m1,m2);
...
}

std::unique_lock 相比 std::lock_guard 更多加的灵活,拥有更多的成员函数,如 lock() , unlock()等,也就是说 unique_lock 可以提前释放锁,而不必等到析构的时候再释放,如下所示。

1
2
3
4
5
6
7
8
9
10
11
std::mutex g_mutex;

{
std::unique_lock lk(g_mutex);
// 临界区
...
// 提前释放锁
lk.unlock();
// 做一些后续的处理
...
}

左: lock_guard,右: unique_lock

条件变量

condition_variable 条件变量是C++11中就有的class,通过使用 std::mutex 来实现同步原语,这里简单介绍下两个关键的成员函数[1]

wait函数使用

正如 cppreference 里描述的那样,condition_variable 的 wait 函数提供了两种函数原型:

1
2
3
void wait (unique_lock<mutex>& lck);		(1
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred); (2
  • 对于函数原型(1),执行 wait 函数,会直接解锁传入的互斥量并阻塞当前线程,直到其他的某个线程调用 notify_one 或者 notify_all 后,该线程被唤醒,会重新尝试获取互斥量,如果得不到线程就会阻塞在这里,直到获取到互斥量为止才会结束 wait,继续执行下面的代码。

  • 对于函数原型(2),等同于如下的代码,该函数主要用于避免一些虚假唤醒(spurious awakenings)的情况,通过指定的判断条件决定是否继续等待。需要注意的事,在进入该循环前,必须要获取到互斥量,而执行完 wait 后,也需要重新获取到互斥量,即这里的互斥量会保护执行 stop_waiting() 。

1
2
3
4
while (!stop_waiting())
{
wait(lock);
}

wait 函数的具体使用方法可以参考下面代码[2]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <thread>

std::condition_variable cv;
std::mutex cv_m; // This mutex is used for three purposes:
// 1) to synchronize accesses to i
// 2) to synchronize accesses to std::cerr
// 3) for the condition variable cv
int i = 0;

void waits(int num)
{
std::unique_lock<std::mutex> lk(cv_m);
std::cerr << "num:" << num << " Waiting...\n";
cv.wait(lk, []{ return i == 1; });
std::cerr << "num:" << num << " ...finished waiting. i == 1\n";
}

void signals()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
std::cerr << "Notifying...\n";
}
cv.notify_all();

std::this_thread::sleep_for(std::chrono::seconds(1));

{
std::lock_guard<std::mutex> lk(cv_m);
i = 1;
std::cerr << "Notifying again...\n";
}
cv.notify_all();
}

int main()
{
std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}

其中一组可能的输出为:

1
2
3
4
5
6
7
8
num:3 Waiting...
num:1 Waiting...
num:2 Waiting...
Notifying...
Notifying again...
num:3 ...finished waiting. i == 1
num:2 ...finished waiting. i == 1
num:1 ...finished waiting. i == 1

简单分析一下上面的代码和输出结果:在 main 函数中创建了三个 waits 线程和一个 signals 线程,然后主线程等待所有线程执行完毕。由于 signals 线程会先 sleep 1 秒,所以先执行三个 waits 线程,而这三个线程执行的顺序是随机的,当其中一个线程先执行拿到锁后,执行输出 Waiting(输出结果里第一个执行的就是线程3),然后执行 wait ,由于不满足条件所以会释放锁并阻塞当前线程。紧接着,其他两个由于没有拿到互斥锁而被阻塞的线程,其中一个拿到了锁,也继续往下执行,重复类似的操作。最后三个线程都在等待被唤醒。

signals 线程睡眠完之后,继续往下执行,第一次执行 notify_all,唤醒了等待的三个线程,而每个线程唤醒也都会去尝试获取互斥锁,因此此时三个线程还是相当于以随机顺序串行执行,但是因为没有满足退出等待的条件(i == 1),所以又会释放锁并进入等待。signals 线程将 i 赋值为1后,第二次执行 notify_all,此时waits 三个线程依次拿到锁,并判断结束条件成立后,继续往下执行输出,最终输出的顺序也是随机的。

notify_one & notify_all

1
2
void notify_one() noexcept;
void notify_all() noexcept;

两个 notify 的函数原型也都比较简单,其中 notify_one 是去唤醒当前条件变量等待队列里的一个线程,如果没有线程在等待,则函数不执行任何操作,如果等待的线程多于一个,则唤醒的线程是随机的。notify_all 则是去唤醒所有正在等待的线程。

在 cppreference 指出,通知线程不需要持有与等待线程线程相同的互斥锁,如果持有的话,反而可能是一种性能劣化,因为被通知的线程因为无法获取到锁而再次阻塞,直到通知线程释放锁。因此,一些实现(特别是许多pthread实现)为识别这种情况,在 notify 调用中,会直接将等待线程从条件变量的等待队列中转移到互斥锁的等待队列里,从而避免了这种 “hurry up and wait” 情况。

线程池实现

一个简单线程池的实现如下所示,这里针对每个成员函数作简要分析。

  • 首先是构造函数,就是在启动多个 worker 线程,执行 Run 这一循环函数。
  • 析构函数,主要是去调用 Stop 函数,终止所有 worker 线程,进行资源回收。
  • AddTask 函数,使用 std::bind 将传入的参数转化为 function object,加入到 pending_tasks_ 队列中,并利用条件变量去唤醒一个空闲的 worker 线程去执行对应任务。(这里还有 Parameter Pack 相关知识点,后续有机会再展开讲讲。)
  • Stop 函数,唤醒所有 worker 线程,等待所有线程处理完当前任务后退出线程,回收线程资源。
  • Run 函数,worker 线程的运行的循环函数,通过条件变量的 wait 来判断当前是否有待处理的任务或者线程池是否终止,如果收到终止信号,则退出死循环,否则从任务队列中取出一个任务执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class SimpleThreadPool final {
public:
SimpleThreadPool(size_t num_threads) : num_threads_(num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back(&SimpleThreadPool::Run, this, i);
}
}

~SimpleThreadPool() { if(!stop_) { Stop(); } }

template<class Func, class... Args>
void AddTask(Func &&func, Args &&... args) {
auto task = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
std::unique_lock<std::mutex> guard(mutex_);
pending_tasks_.push(task);
cv_.notify_one();
}

void Stop() {
{
std::unique_lock<std::mutex> guard(mutex_);
if (stop_) { return; }
stop_ = true;
cv_.notify_all();
}
for (auto &w: workers_) { w.join(); }
}

private:
void Run(size_t i) {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> guard(mutex_);
cv_.wait(guard, [this]() { return this->stop_ || !this->pending_tasks_.empty(); });
if (stop_) { break; }
task = std::move(pending_tasks_.front());
pending_tasks_.pop();
}
task();
}
}

const size_t num_threads_;
mutable std::mutex mutex_;
mutable std::condition_variable cv_;
std::queue<std::function<void()>> pending_tasks_;
std::vector<std::thread> workers_;
bool stop_{false};
};

备注/参考


C++简单线程池
https://2017zhangyuxuan.github.io/2023/11/17/2023-11/2023-11-17 C++简单线程池/
作者
Zhang Yuxuan
发布于
2023年11月17日
许可协议