C++新特性||线程协程
目录
- 1 C++11多线程thread
- 1.1 线程thread
- 1.1.1 语法
- 1.1.2 **简单线程的创建**
- **1.1.3 线程封装**
- 1.2 互斥量
- 1.2.1 独占互斥量std::mutex
- 1.2.2 递归互斥量std::recursive_mutex
- 1.2.3 带超时的互斥量std::timed_mutex和std::recursive_timed_mutex
- **1.2.4 lock_guard和unique_lock的使用和区别**
- 1.3 条件变量
- 1.3.1 成员函数
- 1. **wait函数**
- 1. wait(unique_lock<mutex>& lck)
- 2. wait(unique_lock<mutex>& lck, Predicate pred)
- 2. **wait_for函数**
- 1. wait_for(unique_lock<mutex>& lck, const chrono::duration<Rep, Period>& rel_time)
- 2. wait_for(unique_lock<mutex>& lck, const chrono::duration<Rep, Period>& rel_time, Predicate pred)
- 3. wait_until函数
- 1. wait_until(不带条件谓词版本)
- 2. wait_until(带条件谓词版本)
- 4. notify_one函数
- 5. notify_all函数
- 1.3.2 范例
- 1.4 原子变量
- 1.5 call_once和once_flag使用
- 1.6 异步操作
- 1.6.1 std::future
- 1.6.2 std::packaged_task
- 1.6.3 std::promise
- 1.6.4 总结
- 2 function和bind用法
- 2.1 function的用法
- 2.2 bind用法
- 3 可变模板参数
- 3.1 可变模版参数的展开
- 3.1.1 可变模版参数函数
- ***递归函数方式展开参数包***
- 逗号表达式展开参数包
- 4 实现C++线程池
- 范例4-threadpool
- 参考
C++ 参考手册 https://zh.cppreference.com/w/cpp
1 C++11多线程thread
1.1 线程thread
std::thread
在 #include 头文件中声明,因此使用 std::thread
时需要包含** thread
** 头文件。
1.1.1 语法
构造函数
- 默认构造函数
//创建一个空的 thread 执行对象。
thread() _NOEXCEPT{ // construct with no thread_Thr_set_null(_Thr);}
- 初始化构造函数
//创建std::thread执行对象,该thread对象可被joinable,新产生的线程会调用threadFun函数,该函数的参数由 args 给出template<class Fn, class... Args>explicit thread(Fn&& fn, Args&&... args);
- 拷贝构造函数
// 拷贝构造函数(被禁用),意味着 thread 不可被拷贝构造。
thread(const thread&) = delete;
- Move构造函数
//move 构造函数,调用成功之后 x 不代表任何 thread 执行对象。
注意:可被 joinable 的 thread 对象必须在他们销毁之前被主线程 join 或者将其设置为
detached。
thread(thread&& x)noexcept
主要成员函数
get_id()
- 获取线程ID,返回类型std::thread::id对象。
- http://www.cplusplus.com/reference/thread/thread/get_id/
joinable()
- 判断线程是否可以加入等待
- http://www.cplusplus.com/reference/thread/thread/joinable/
join()
- 等该线程执行完成后才返回。
- http://www.cplusplus.com/reference/thread/thread/join/
detach()
- 将本线程从调用线程中分离出来,允许本线程独立执行。(但是当主进程结束的时候,即便是
detach()
出去的子线程不管有没有完成都会被强制杀死) - https://cplusplus.com/reference/thread/thread/detach/
- 将本线程从调用线程中分离出来,允许本线程独立执行。(但是当主进程结束的时候,即便是
1.1.2 简单线程的创建
使用std::thread
创建线程,提供线程函数或者函数对象,并可以同时指定线程函数的参数。
#include <iostream>
#include <thread>
using namespace std;// 普通函数 func1
void func1() {cout << "func1 into" << endl; // 输出一条简单的消息
}// 普通函数 func2,带两个参数
void func2(int a, int b) {cout << "func2 a + b = " << a + b << endl; // 输出参数的和
}// 定义类 A
class A {
public:// 类的静态成员函数 fun3static void fun3(int a) {cout << "a = " << a << endl; // 输出参数 a}
};int main() {// 创建一个线程 t1,执行 func1 函数std::thread t1(func1); t1.join(); // 阻塞主线程,直到线程 t1 执行完成int a = 10;int b = 20;// 创建一个线程 t2,执行 func2 函数,并传递参数 a 和 bstd::thread t2(func2, a, b); t2.join(); // 阻塞主线程,直到线程 t2 执行完成// 创建一个线程 t3,执行类 A 的静态成员函数 fun3,并传递参数 1std::thread t3(&A::fun3, 1); t3.join(); // 阻塞主线程,直到线程 t3 执行完成return 0;
}
代码重点
- 普通函数线程绑定:
- 示例:
std::thread t1(func1)
; - 直接将普通函数传递给线程构造函数即可。
- 示例:
- 带参数的函数线程绑定:
- 示例:
std::thread t2(func2, a, b)
; - 传递参数时,按值传递(即 a 和 b 的副本被传递)。
- 示例:
- 类的静态函数线程绑定:
- 示例:
std::thread t3(&A::fun3, 1)
; - 静态成员函数不依赖于具体对象,可以直接通过类名调用。
- 示例:
1.1.3 线程封装
见范例1-thread2-pack
zero_thread.h
#ifndef ZERO_THREAD_H
#define ZERO_THREAD_H#include <thread> // 包含线程库
class ZERO_Thread {
public:ZERO_Thread(); // 构造函数,初始化线程相关状态virtual ~ZERO_Thread(); // 虚析构函数,确保子类正确析构bool start(); // 启动线程void stop(); // 停止线程bool isAlive() const; // 检查线程是否存活std::thread::id id() { return _th->get_id(); } // 获取线程的 IDstd::thread* getThread() { return _th; } // 返回线程指针void join(); // 等待线程结束,主线程不能在自身上调用void detach(); // 分离线程,可在当前线程上调用static size_t CURRENT_THREADID(); // 静态方法,获取当前线程 ID
protected:static void threadEntry(ZERO_Thread *pThread); // 线程入口静态函数virtual void run() = 0; // 纯虚函数,子类需实现具体逻辑
protected:bool _running; // 线程运行状态标志,true 表示线程正在运行std::thread *_th; // 指向底层 std::thread 对象的指针
};
#endif // ZERO_THREAD_H
zero_thread.cpp
#include "zero_thread.h"
#include <sstream>
#include <iostream>
#include <exception>ZERO_Thread::ZERO_Thread() :_running(false), _th(nullptr) {}ZERO_Thread::~ZERO_Thread() {if (_th != nullptr) {if (_th->joinable()) {_th->detach(); // 分离线程,防止资源泄漏}delete _th;_th = nullptr;}std::cout << "~ZERO_Thread()" << std::endl; // 调试信息
}bool ZERO_Thread::start() {if (_running) {return false; // 防止重复启动}try {_th = new std::thread(&ZERO_Thread::threadEntry, this); // 创建线程} catch (...) {throw "[ZERO_Thread::start] thread start error"; // 捕获启动异常}return true;
}void ZERO_Thread::stop() {_running = false; // 设置线程为非活动
}bool ZERO_Thread::isAlive() const {return _running; // 返回运行状态
}void ZERO_Thread::join() { //阻塞主线程,等待子线程结束。if (_th->joinable()) {_th->join(); // 等待线程结束}
}void ZERO_Thread::detach() {if (_th != nullptr) {_th->detach(); // 分离线程,使其在后台独立运行}
}size_t ZERO_Thread::CURRENT_THREADID() {static thread_local size_t threadId = 0; // 每个线程唯一的静态局部变量if (threadId == 0) {std::stringstream ss;ss << std::this_thread::get_id(); // 获取线程 IDthreadId = strtol(ss.str().c_str(), nullptr, 0); // 转换为数值类型}return threadId;
}void ZERO_Thread::threadEntry(ZERO_Thread *pThread) {pThread->_running = true; // 设置线程为运行状态try {pThread->run(); // 调用子类实现的逻辑} catch (std::exception &ex) {pThread->_running = false;throw ex; // 向上传递异常} catch (...) {pThread->_running = false;throw;}pThread->_running = false; // 线程结束后设置状态
}
main.cpp
#include <iostream>
#include <chrono>
#include "zero_thread.h"
using namespace std;// 类 A,继承自 ZERO_Thread,执行特定任务
class A : public ZERO_Thread {
public:void run() override {while (_running) { // 当线程运行标志为 truecout << "print A " << endl; // 输出信息std::this_thread::sleep_for(std::chrono::seconds(5)); // 延时 5 秒}cout << "----- leave A " << endl; // 线程结束时输出}
};// 类 B,继承自 ZERO_Thread,执行特定任务
class B : public ZERO_Thread {
public:void run() override {while (_running) { // 当线程运行标志为 truecout << "print B " << endl; // 输出信息std::this_thread::sleep_for(std::chrono::seconds(2)); // 延时 2 秒}cout << "----- leave B " << endl; // 线程结束时输出}
};int main() {{A a; // 创建 A 的线程对象a.start(); // 启动线程 AB b; // 创建 B 的线程对象b.start(); // 启动线程 Bstd::this_thread::sleep_for(std::chrono::seconds(10)); // 主线程延时 10 秒a.stop(); // 停止线程 Aa.join(); // 等待线程 A 结束b.stop(); // 停止线程 Bb.join(); // 等待线程 B 结束}cout << "Hello World!" << endl; // 主线程结束提示return 0;
}
运行结果
假设主线程延时 10 秒,线程 A 延时 5 秒,线程 B 延时 2 秒,输出示例如下:
print A
print B
print B
print B
print B
print A
print B
print B
print B
print B
----- leave A
----- leave B
~ZERO_Thread()
~ZERO_Thread()
Hello World!
代码解释
- 多线程运行:
- 类 A 和 B 的线程分别运行各自的
run()
方法,输出自己的信息。 - 线程 A 每 5 秒输出一次,线程 B 每 2 秒输出一次。
- 类 A 和 B 的线程分别运行各自的
- 线程管理:
a.stop()
和b.stop()
设置_running
为false
,停止线程。a.join()
和b.join()
等待线程完全结束。
- 资源清理:
ZERO_Thread
析构函数中确保线程已被detach
或join
,防止资源泄漏。
- 线程生命周期:
- 主线程等待 10 秒后停止子线程,并回收资源,最后输出 Hello World!。
1.2 互斥量
mutex
又称互斥量,C++ 11中与 mutex
相关的类(包括锁类型)和函数都声明在 头文件中,所以如果你需要使用 std::mutex
,就必须包含**mutex **头文件。
C++11提供如下4种语义的互斥量(mutex)
std::mutex
,独占的互斥量,不能递归使用。std::time_mutex
,带超时的独占互斥量,不能递归使用。std::recursive_mutex
,递归互斥量,不带超时功能。std::recursive_timed_mutex
,带超时的递归互斥量。
1.2.1 独占互斥量std::mutex
std::mutex
介绍
下面以 std::mutex 为例介绍 C++11 中的互斥量用法。
std::mutex 是C++11 中最基本的互斥量,std::mutex 对象提供了独占所有权的特性——即不支持递归地对 std::mutex 对象上锁,而 std::recursive_lock
则可以递归地对互斥量对象上锁。
std::mutex 的成员函数
-
构造函数,
std::mutex
不允许拷贝构造,也不允许move
拷贝,最初产生的mutex
对象是处于 unlocked 状态的。 -
lock()
,调用线程将锁住该互斥量。线程调用该函数会发生下面 3 种情况:(1). 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用unlock
之前,该线程一直拥有该锁。(2). 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。(3). 如果当前互斥量被当前调用线程锁 住,则会产生死锁(deadlock)。 -
unlock()
, 解锁,释放对互斥量的所有权。 -
try_lock()
,尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞。线程调用该 函数也会出现下面 3 种情况,(1).如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直 到该线程调用unlock
释放互斥量。(2).如果当前互斥量被其他线程锁住,则当前调用线程返回false
,而并不会被阻塞掉。(3).如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
范例1-2-mutex1
#include <iostream> // 提供 std::cout
#include <thread> // 提供多线程功能 std::thread
#include <mutex> // 提供互斥锁功能 std::mutex// 全局变量声明
volatile int counter(0); // 非原子类型的计数器,需保护
std::mutex mtx; // 全局互斥锁,用于保护对 `counter` 的访问// 函数:每个线程尝试增加计数器 10,000 次
void increases_10k() {for (int i = 0; i < 10000; ++i) {// **第一种情况:使用 try_lock**// 如果互斥锁当前未被其他线程占用,则加锁并增加计数器if (mtx.try_lock()) { ++counter; // 计数器自增mtx.unlock(); // 释放锁}// **第二种情况:使用 lock**// 确保线程在获得锁之后再执行操作,lock 是阻塞调用mtx.lock(); ++counter; // 计数器自增mtx.unlock(); // 释放锁}
}int main() {std::thread threads[10]; // 创建一个包含 10 个线程的数组// 创建并启动 10 个线程,执行 `increases_10k` 函数for (int i = 0; i < 10; ++i)threads[i] = std::thread(increases_10k);// 等待所有线程完成for (auto& th : threads)th.join();// 输出最终计数器值std::cout << "Successful increases of the counter: " << counter << std::endl;return 0;
}
详细说明与注释
-
全局变量:
counter
是共享的非原子计数器,多个线程可能同时访问,需要通过互斥锁保护。mtx
是标准库提供的互斥锁,用于控制多个线程对共享资源的访问。
-
两种锁机制:
try_lock()
:- 尝试获取锁,如果锁当前未被其他线程持有,则成功获取锁并执行操作。
- 如果锁已被占用,则跳过操作,不阻塞当前线程。
lock()
:- 阻塞调用,直到当前线程成功获得锁。
- 确保线程获得锁后再执行操作。
-
increases_10k
函数:- 每个线程尝试增加计数器 10,000 次。
- 为了保护计数器操作,使用了两种不同的互斥锁调用(
try_lock
和lock
)。 - 互斥锁保证了对计数器的访问是线程安全的,避免数据竞争。
-
main
函数:- 创建 10 个线程,每个线程运行
increases_10k
函数。 - 使用
threads[i] = std::thread(increases_10k)
启动线程。 - 使用
th.join()
等待每个线程完成,确保主线程在所有子线程结束后输出结果。
- 创建 10 个线程,每个线程运行
-
运行结果:
- 由于互斥锁保护,所有线程对
counter
的增加操作是线程安全的。 - 最终输出的
counter
值为所有线程成功执行的增加次数。
- 由于互斥锁保护,所有线程对
输出示例
Successful increases of the counter: 200000
解释:
- 每个线程尝试增加计数器 10,000 次,共有 10 个线程,因此计数器的最终值应为 10,000 × 10 = 100,000。
- 因为使用了
try_lock
,部分增加可能会因锁未成功获取而被跳过,但lock
确保了所有剩余增加操作完成,因此总数达到
200,000。
1.2.2 递归互斥量std::recursive_mutex
递归锁允许同一个线程多次获取该互斥锁,可以用来解决同一线程需要多次获取互斥量时死锁的问题。
死锁范例1-2-mutex2-dead-lock
#include <iostream> // 提供 std::cout
#include <thread> // 提供多线程功能 std::thread
#include <mutex> // 提供互斥锁功能 std::mutex// 定义一个结构体 Complex,包含一个整数和一个互斥锁
struct Complex {std::mutex mutex; // 用于保护 `i` 的互斥锁int i; // 共享资源,整数变量Complex() : i(0) {} // 初始化 `i` 为 0// 乘法操作,需要持有互斥锁void mul(int x) {std::lock_guard<std::mutex> lock(mutex); // 加锁,保护共享资源i *= x; // 乘法操作}// 除法操作,也需要持有互斥锁void div(int x) {std::lock_guard<std::mutex> lock(mutex); // 加锁,保护共享资源i /= x; // 除法操作}// 同时执行乘法和除法void both(int x, int y) {std::lock_guard<std::mutex> lock(mutex); // 加锁,保护共享资源mul(x); // 调用乘法函数div(y); // 调用除法函数}
};int main(void) {Complex complex; // 创建一个 Complex 对象complex.both(32, 23); // 调用 `both` 函数return 0;
}
运行后出现死锁的情况。在调用both
时获取了互斥量,在调用mul
时又要获取互斥量,但both
的并没有释放,从而产生死锁。
详细注释与说明
-
Complex
结构体:- 包含一个整数成员
i
和一个互斥锁mutex
。 - 提供了三个方法:
mul(int x)
:将 i 乘以 x,需要互斥锁保护。div(int x)
:将 i 除以 x,也需要互斥锁保护。both(int x, int y)
:依次调用 mul 和 div,也需要互斥锁保护。
- 包含一个整数成员
-
锁嵌套问题:
- 在
both
函数中,通过std::lock_guard<std::mutex>
加锁保护。 both
函数内部又调用了mul
和div
,它们也尝试对同一个互斥锁加锁。- 这种情况下,重复加锁会导致死锁。
- 在
-
死锁原因:
std::lock_guard
在每次加锁时都会尝试获取锁,而both
函数中已经持有了锁。- 递归加锁未被支持时,线程会因尝试对已经被锁定的互斥锁加锁而阻塞,最终导致死锁。
-
运行结果:
- 该代码运行时会触发死锁现象,程序将永久卡在
both
函数中,无法继续执行。
- 该代码运行时会触发死锁现象,程序将永久卡在
使用递归锁
#include <iostream> // 提供 std::cout
#include <thread> // 提供多线程功能 std::thread
#include <mutex> // 提供互斥锁功能 std::recursive_mutex// 定义一个结构体 Complex,包含一个整数和一个递归互斥锁
struct Complex {std::recursive_mutex mutex; // 递归互斥锁int i; // 共享资源Complex() : i(0) {} // 构造函数,初始化 `i` 为 0// 乘法操作,使用递归锁保护void mul(int x) {std::lock_guard<std::recursive_mutex> lock(mutex); // 加锁i *= x; // 执行乘法操作}// 除法操作,使用递归锁保护void div(int x) {std::lock_guard<std::recursive_mutex> lock(mutex); // 加锁i /= x; // 执行除法操作}// 同时执行乘法和除法操作void both(int x, int y) {std::lock_guard<std::recursive_mutex> lock(mutex); // 加锁mul(x); // 调用乘法函数div(y); // 调用除法函数}
};int main(void) {Complex complex; // 创建 Complex 对象complex.both(32, 23); // 调用 both 函数(同时执行乘法和除法操作)// 输出主函数结束消息std::cout << "main finish\n";return 0;
}
虽然递归锁能解决这种情况的死锁问题,但是尽量不要使用递归锁,主要原因如下:
- 需要用到递归锁的多线程互斥处理本身就是可以简化的,允许递归很容易放纵复杂逻辑的产生,并且产生晦涩,当要使用递归锁的时候应该重新审视自己的代码是否一定要使用递归锁;
- 递归锁比起非递归锁,效率会低;
- 递归锁虽然允许同一个线程多次获得同一个互斥量,但可重复获得的最大次数并未具体说明,一旦超过一定的次数,再对lock进行调用就会抛出
std::system
错误。
1.2.3 带超时的互斥量std::timed_mutex和std::recursive_timed_mutex
std::timed_mutex
比std::mutex
多了两个超时获取锁的接口:try_lock_for
和try_lock_until
#include <iostream> // 提供 std::cout
#include <thread> // 提供多线程功能 std::thread
#include <mutex> // 提供互斥锁功能 std::timed_mutex
#include <chrono> // 提供时间工具 std::chronostd::timed_mutex mutex; // 定义一个 `std::timed_mutex` 互斥锁// 定义一个线程函数,模拟工作任务
void work() {std::chrono::milliseconds timeout(100); // 定义尝试获取锁的超时时间为 100 毫秒while (true) {// 尝试在指定的时间内获取互斥锁if (mutex.try_lock_for(timeout)) {// 成功获取互斥锁后执行任务std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl;// 模拟持有锁的工作时间 250 毫秒std::chrono::milliseconds sleepDuration(250);std::this_thread::sleep_for(sleepDuration);// 释放互斥锁mutex.unlock();// 模拟其他工作时间 250 毫秒std::this_thread::sleep_for(sleepDuration);} else {// 获取锁失败时,可以进行其他工作或简单地继续尝试获取锁std::cout << std::this_thread::get_id() << ": unable to acquire lock, doing other work" << std::endl;}}
}int main(void) {// 创建两个线程分别执行 `work` 函数std::thread t1(work);std::thread t2(work);// 等待线程完成(阻塞主线程)t1.join();t2.join();// 输出主线程的工作结束信息std::cout << std::this_thread::get_id() << ": do work without the mutex" << std::endl;// 模拟主线程延迟 100 毫秒后结束std::chrono::milliseconds sleepDuration(100);std::this_thread::sleep_for(sleepDuration);std::cout << "main finish\n";return 0;
}
详细说明与注释
-
std::timed_mutex
的定义:- 一种互斥锁,支持定时尝试锁操作。
- 提供以下主要方法:
try_lock_for(duration)
:尝试在指定时间内获取锁,返回 true 表示成功获取锁,false 表示超时未获取到锁。try_lock_until(time_point)
:尝试在指定时间点前获取锁。
-
程序运行过程:
- 主函数创建了两个线程,分别执行 work 函数。
- 每个线程不断尝试获取锁,每次尝试最多等待 100 毫秒。
- 如果获取锁成功:
- 输出消息表明成功获取锁。
- 模拟工作 250 毫秒后释放锁。
- 如果获取锁失败:
- 输出消息表明未能获取锁。
- 线程继续尝试或执行其他操作。
-
避免死锁:
std::timed_mutex
使用超时策略避免了线程无限等待锁,从而防止潜在死锁。
-
主线程操作:
- 主线程等待两个子线程结束。
- 输出自身完成的工作,模拟一些延迟后结束程序。
-
运行结果(示例输出,线程顺序可能不同):
139760241526528: do work with the mutex
139760233133824: unable to acquire lock, doing other work
139760241526528: do work with the mutex
139760233133824: unable to acquire lock, doing other work
...
139760225287936: do work without the mutex
main finish
-
注意事项:
- 在多线程环境下,
std::timed_mutex
提供了一种灵活的锁策略,避免因锁竞争导致的阻塞。 - 使用时要注意锁持有时间过长可能影响其他线程的执行效率。
- 在多线程环境下,
1.2.4 lock_guard和unique_lock的使用和区别
相对于手动lock
和unlock
,我们可以使用RAII
(通过类的构造析构)来实现更好的编码方式。 这里涉及到unique_lock,lock_guard
的使用。
ps: C++相较于C引入了很多新的特性, 比如可以在代码中抛出异常, 如果还是按照以前的加锁解锁的话代码会极为复杂繁琐
#include <iostream> // 提供 std::cout 输出
#include <thread> // 提供多线程支持 std::thread
#include <mutex> // 提供互斥锁支持 std::mutex, std::lock_guard
#include <stdexcept> // 提供 std::logic_error 异常类型std::mutex mtx; // 定义一个互斥锁,确保线程安全地访问共享资源// 函数:打印偶数
void print_even(int x) {if (x % 2 == 0) { // 判断是否为偶数std::cout << x << " is even\n"; // 如果是偶数,打印} else {// 如果不是偶数,抛出逻辑错误异常throw (std::logic_error("not even"));}
}// 函数:打印线程 ID,并检查数字是否为偶数
void print_thread_id(int id) {try {// 使用 lock_guard 来自动管理互斥锁的加锁与释放std::lock_guard<std::mutex> lck(mtx); // 加锁,确保线程安全print_even(id); // 调用 print_even 判断并打印偶数} catch (std::logic_error&) {// 捕获并处理 print_even 中抛出的逻辑错误异常std::cout << "[exception caught]\n"; // 打印异常捕获信息}
}int main() {std::thread threads[10]; // 创建 10 个线程// 启动 10 个线程,每个线程执行 print_thread_id,传入不同的参数for (int i = 0; i < 10; ++i) {threads[i] = std::thread(print_thread_id, i + 1); // 启动线程}// 等待所有线程执行完毕for (auto& th : threads) {th.join(); // 阻塞等待每个线程完成}return 0; // 主程序结束
}
这里的lock_guard
换成unique_lock
是一样的。
unique_lock,lock_guard
的区别
-
unique_lock
与lock_guard
都能实现自动加锁和解锁,但是前者更加灵活,能实现更多的功能。 -
unique_lock
可以进行临时解锁和再上锁,如在构造对象之后使用lck.unlock()
就可以进行解锁,lck.lock()
进行上锁,而不必等到析构时自动解锁。
#include <iostream> // 提供 std::cout
#include <deque> // 提供双端队列 std::deque
#include <thread> // 提供多线程支持 std::thread
#include <mutex> // 提供互斥锁 std::mutex
#include <condition_variable> // 提供条件变量 std::condition_variable
#include <unistd.h> // 提供 sleep() 函数// 定义一个全局双端队列,用于线程之间的数据交换
std::deque<int> q;
std::mutex mu; // 互斥锁,用于保护队列的访问
std::condition_variable cond; // 条件变量,用于同步线程间的操作// 线程1的函数:向队列中插入数据
void fun1() {int count = 0; // 定义局部变量用于存储插入的数据while (true) {std::unique_lock<std::mutex> locker(mu); // 获取互斥锁,保证线程安全q.push_front(count); // 将 count 插入队列的前端locker.unlock(); // 解锁互斥锁,允许其他线程访问队列cond.notify_one(); // 唤醒一个等待条件变量的线程(通常是消费者线程)sleep(10); // 模拟某种操作,睡眠10秒钟后继续执行count++; // 每次插入一个新的数据}
}// 线程2的函数:从队列中取出数据并处理
void fun2() {while (true) {std::unique_lock<std::mutex> locker(mu); // 获取互斥锁,保证线程安全// 等待条件变量:当队列为空时,线程会阻塞,直到队列不为空cond.wait(locker, [](){ return !q.empty(); });int data = q.back(); // 从队列的后端取出数据q.pop_back(); // 从队列中移除这个数据locker.unlock(); // 解锁互斥锁,允许其他线程访问队列std::cout << "thread2 get value from thread1: " << data << std::endl; // 输出获取的数据}
}// 主函数,创建并启动两个线程
int main() {std::thread t1(fun1); // 创建并启动线程t1,执行fun1std::thread t2(fun2); // 创建并启动线程t2,执行fun2t1.join(); // 阻塞主线程,等待线程t1结束t2.join(); // 阻塞主线程,等待线程t2结束return 0; // 返回0,程序结束
}
线程间的同步:
- 生产者线程每次插入数据后会通知消费者线程(通过
cond.notify_one()
),这样消费者线程能够及时从队列中取出数据。 - 如果队列为空,消费者线程会在
cond.wait
阻塞,直到队列中有新的数据。
条件变量的使用:
- 条件变量
cond
确保了消费者线程不会在队列为空时一直忙等待,而是可以安全地等待直到队列中有数据。它通过wait
方法实现阻塞,且通过notify_one
或notify_all
唤醒一个或多个等待的线程。
条件变量的目的就是为了,在没有获得某种提醒时长时间休眠; 如果正常情况下, 我们需要一直循环(+sleep), 这样的问题就是CPU消耗+时延问题,条件变量的意思是在cond.wait
这里一直休眠直到cond.notify_one
唤醒才开始执行下一句; 还有cond.notify_all()
接口用于唤醒所有等待的线程。
那么为什么必须使用unique_lock呢?
原因: 条件变量在wait时会进行unlock再进入休眠, lock_guard并无该操作接口
wait:
如果线程被唤醒或者超时那么会先进行lock获取锁, 再判断条件(传入的参数)是否成立, 如果成立则wait函数返回否则释放锁继续休眠
notify:
进行notify动作并不需要获取锁
总结
lock_guard
1.std::lock_guard
在构造函数中进行加锁,析构函数中进行解锁。
2.锁在多线程编程中,使用较多,因此c++11提供了lock_guard
模板类;在实际编程中,我们也可以根据自己的场景编写resource_guard RAII
类,避免忘掉释放资源。
std::unique_lock
unique_lock
是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量一同使用。unique_lock
比lock_guard
使用更加灵活,功能更加强大。- 使用
unique_lock
需要付出更多的时间、性能成本。
1.3 条件变量
互斥量是多线程间同时访问某一共享变量时,保证变量可被安全访问的手段。但单靠互斥量无法实现线程的同步。线程同步是指线程间需要按照预定的先后次序顺序进行的行为。C++11对这种行为也提供了有力的支持,这就是条件变量。条件变量位于头文件condition_variable下。
https://cplusplus.com/reference/condition_variable/condition_variable/
条件变量使用过程:
- 拥有条件变量的线程获取互斥量;
- 循环检查某个条件,如果条件不满足则阻塞直到条件满足;如果条件满足则向下执行;
- 某个线程满足条件执行完之后调用
notify_one
或notify_all
唤醒一个或者所有等待线程。
条件变量提供了两类操作:wait
和notify
。这两类操作构成了多线程同步的基础。
1.3.1 成员函数
1. wait函数
函数原型
void wait (unique_lock<mutex>& lck);template <class Predicate>void wait (unique_lock<mutex>& lck, Predicate pred);
包含两种重载,第一种只包含unique_lock
对象,另外一个Predicate
对象(等待条件),这里必须使用unique_lock
,因为wait
函数的工作原理:
- 当前线程调用
wait()
后将被阻塞并且函数会解锁互斥量,直到另外某个线程调用notify_one
或者notify_all
唤醒当前线程;一旦当前线程获得通知(notify)
,wait()
函数也是自动调用lock()
,同理不能使用lock_guard
对象。 - 如果wait没有第二个参数,第一次调用默认条件不成立,直接解锁互斥量并阻塞到本行,直到某一个线程调用notify_one或notify_all为止,被唤醒后,wait重新尝试获取互斥量,如果得不到,线程会卡在这里,直到获取到互斥量,然后无条件地继续进行后面的操作。
- 如果
wait
包含第二个参数,如果第二个参数不满足,那么wait
将解锁互斥量并堵塞到本行,直到某一个线程调用notify_one
或notify_all
为止,被唤醒后,wait
重新尝试获取互斥量,如果得不到,线程会卡在这里,直到获取到互斥量,然后继续判断第二个参数,如果表达式为false,wait对互斥量解锁,然后休眠,如果为true,则进行后面的操作。
https://blog.csdn.net/li1615882553/article/details/86179781
1. wait(unique_lock& lck)
void wait (std::unique_lock<std::mutex>& lck);
功能:
- 这个版本的
wait
会让当前线程在std::condition_variable
上等待,直到它被唤醒。 - 它需要传入一个已经加锁的
std::unique_lock<std::mutex>
,并在调用后自动释放锁,等待条件满足后再次获取锁。
工作机制:
当调用 wait
时,当前线程会释放锁并阻塞自己,直到被另一个线程唤醒(通常通过 notify_one()
或 notify_all()
)。
唤醒后,线程会重新获得锁并继续执行。
注意:在 wait
被调用时,必须先持有 mutex
,因为 wait
会释放锁,避免在等待期间占用锁的资源。锁会在等待期间被解锁,并且在唤醒后重新加锁。
常见用法:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex mu;
std::condition_variable cond;void print_numbers(int id) {std::unique_lock<std::mutex> lck(mu); // 获取互斥锁cond.wait(lck); // 等待通知std::cout << "Thread " << id << " is working." << std::endl;
}int main() {std::thread t1(print_numbers, 1);std::thread t2(print_numbers, 2);std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟一些工作cond.notify_all(); // 唤醒所有等待的线程t1.join();t2.join();return 0;
}
在这个例子中,print_numbers
会等待 cond.wait(lck)
,直到 notify_all()
被调用,才会继续执行。
2. wait(unique_lock& lck, Predicate pred)
template <class Predicate>
void wait (std::unique_lock<std::mutex>& lck, Predicate pred);
功能:
- 这个版本的
wait
除了等待外,还会在等待前检查给定的条件谓词pred
,如果条件不满足,线程将继续等待。 - 它与
wait()
的区别在于,wait
在被唤醒后,会再次检查谓词pred
,确保条件的满足。
工作机制:
pred
是一个可调用对象(如函数、lambda 表达式或函数对象),它返回一个布尔值。只有当pred()
返回 true时,线程才会继续执行。- 这样可以防止所谓的“虚假唤醒”(spurious wakeup)问题,在一些情况下,线程可能在没有条件满足的情况下被唤醒,使用带条件的
wait
可以避免这种情况。 - 注意:传入的
pred
函数必须是可以安全访问共享数据的。
常见用法:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex mu;
std::condition_variable cond;
int shared_data = 0;void print_numbers(int id) {std::unique_lock<std::mutex> lck(mu);// 使用条件谓词,只有当 shared_data > 0 时,才继续执行cond.wait(lck, []() { return shared_data > 0; });std::cout << "Thread " << id << " is working." << std::endl;
}int main() {std::thread t1(print_numbers, 1);std::thread t2(print_numbers, 2);std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟一些工作shared_data = 10; // 改变共享数据,满足条件cond.notify_all(); // 唤醒所有等待的线程t1.join();t2.join();return 0;
}
在这个例子中,线程会等待直到 shared_data > 0
,这样确保了只有在条件满足时,线程才会继续执行。
总结
wait(unique_lock<mutex>& lck)
: 简单的等待函数,它释放锁并等待被唤醒。wait(unique_lock<mutex>& lck, Predicate pred)
:通过条件谓词来等待,确保在条件满足时才继续执行,防止虚假唤醒。
2. wait_for函数
函数原型:
template <class Rep, class Period>cv_status wait_for (unique_lock<mutex>& lck,const chrono::duration<Rep,Period>& rel_time);template <class Rep, class Period, class Predicate>bool wait_for (unique_lock<mutex>& lck,const chrono::duration<Rep,Period>& rel_time, Predicate
pred);
和wait
不同的是,wait_for
可以执行一个时间段,在线程收到唤醒通知或者时间超时之前,该线程都会处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for
返回,剩下操作和wait
类似。
1. wait_for(unique_lock& lck, const chrono::duration<Rep, Period>& rel_time)
功能:
wait_for
使当前线程等待,直到被唤醒或等待超时。它会在指定的时间rel_time
后返回,不管条件是否满足。rel_time
参数指定了等待的时间,类型为std::chrono::duration
,可以指定以秒、毫秒、微秒等单位表示的时间。- 该函数需要传入一个已加锁的
std::unique_lock<std::mutex>
,并会自动释放锁直到条件满足或超时。
工作机制:
- 线程在调用
wait_for
后会等待指定的时间rel_time
,如果在超时之前被notify_one()
或notify_all()
唤醒,线程将继续执行。 - 如果超时未被唤醒,函数返回并且线程继续执行。
wait_for
会在等待期间释放互斥锁,确保其他线程能够访问共享数据。
代码示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>std::mutex mu;
std::condition_variable cond;
bool ready = false;void wait_for_ready(int id) {std::unique_lock<std::mutex> lck(mu); // 获取互斥锁// 等待直到指定的时间,若被唤醒则继续执行if (cond.wait_for(lck, std::chrono::seconds(3))) {std::cout << "Thread " << id << " is ready!\n";} else {std::cout << "Thread " << id << " timed out.\n";}
}int main() {std::thread t1(wait_for_ready, 1); // 线程1std::thread t2(wait_for_ready, 2); // 线程2std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟一些延迟ready = true; // 设置为true,表示线程准备好了cond.notify_all(); // 通知所有等待的线程t1.join(); // 等待线程1结束t2.join(); // 等待线程2结束return 0;
}
解释:
- 线程调用
wait_for
后,等待最多 3 秒。如果在 3 秒内线程被唤醒,它继续执行。如果没有被唤醒,且超时了,线程输出 “timedout”。
2. wait_for(unique_lock& lck, const chrono::duration<Rep, Period>& rel_time, Predicate pred)
功能:
- 这个版本的
wait_for
不仅等待指定的时间,还会检查给定的条件谓词pred
,只有当谓词pred
返回 true时,线程才会继续执行。 - 它会在超时之前检查条件是否满足,如果满足条件则立即返回。如果超时仍未满足条件,函数会返回 false。
工作机制:
- 线程会在给定的时间
rel_time
内等待。如果pred
谓词返回 true,线程会继续执行。 - 如果超时或条件不满足,
wait_for
会返回 false,表示条件未满足。 wait_for
在等待时会自动释放互斥锁,防止造成死锁。
代码示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>std::mutex mu;
std::condition_variable cond;
bool ready = false;void wait_for_ready_with_predicate(int id) {std::unique_lock<std::mutex> lck(mu); // 获取互斥锁// 等待直到条件满足或者超时,条件为 ready 为 trueif (cond.wait_for(lck, std::chrono::seconds(3), []() { return ready; })) {std::cout << "Thread " << id << " is ready!\n";} else {std::cout << "Thread " << id << " timed out.\n";}
}int main() {std::thread t1(wait_for_ready_with_predicate, 1); // 线程1std::thread t2(wait_for_ready_with_predicate, 2); // 线程2std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟一些延迟ready = true; // 设置为true,表示线程准备好了cond.notify_all(); // 通知所有等待的线程t1.join(); // 等待线程1结束t2.join(); // 等待线程2结束return 0;
}
解释:
- 线程在调用
wait_for
后,等待最多 3 秒,如果条件ready == true
满足,线程继续执行。 - 如果条件不满足,线程将在超时后返回 false,并输出 “timed out”。
- 这里使用了
std::condition_variable
和一个 条件谓词,只有当ready
被设置为 true 时,线程才会继续。
关键区别总结
3. wait_until函数
函数原型:
cv_status wait_until (std::unique_lock<std::mutex>& lck,const std::chrono::time_point<Clock, Duration>& abs_time);
template <class Clock, class Duration, class Predicate>
bool wait_until (std::unique_lock<std::mutex>& lck,const std::chrono::time_point<Clock, Duration>& abs_time,Predicate pred);
与wait_for
类似,只是wait_until
可以指定一个时间点,在当前线程收到通知或者指定的时间点超时之前,该线程都会处于阻塞状态。如果超时或者收到唤醒通知,wait_until
返回,剩下操作和wait
类似
1. wait_until(不带条件谓词版本)
函数签名:
cv_status wait_until (std::unique_lock<std::mutex>& lck,const std::chrono::time_point<Clock, Duration>& abs_time);
功能:
- 这个版本的
wait_until
用于让线程等待 直到指定的绝对时间点。 - 它会在
abs_time
时间点之前或直到某个线程被唤醒时返回。 - 如果等待的时间到了但线程没有被唤醒,它将返回
cv_status::timeout
。
参数: lck
:传递给wait_until
的unique_lock
,线程需要拥有此锁。abs_time
:一个表示绝对时间点的chrono::time_point
,即线程应该等待直到这个具体的时间点。如果此时间点已经到达,则立即返回。
返回值:- 返回
cv_status
,如果线程被唤醒并继续执行,返回cv_status::no_timeout
;如果线程超时,则返回cv_status::timeout
。
代码示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>std::mutex mu;
std::condition_variable cond;void wait_until_time(int id, std::chrono::steady_clock::time_point abs_time) {std::unique_lock<std::mutex> lck(mu);// 等待直到绝对时间点if (cond.wait_until(lck, abs_time) == std::cv_status::timeout) {std::cout << "Thread " << id << " timed out.\n";} else {std::cout << "Thread " << id << " finished work.\n";}
}int main() {std::thread t1(wait_until_time, 1, std::chrono::steady_clock::now() + std::chrono::seconds(3));std::thread t2(wait_until_time, 2, std::chrono::steady_clock::now() + std::chrono::seconds(5));t1.join();t2.join();return 0;
}
解释:
- 这里我们传递了一个绝对时间点(当前时间加 3 秒和 5 秒),分别让线程 1 和线程 2 等待直到各自的时间点。
- 如果超时,线程输出 “timed out”,否则输出 “finished work”。
2. wait_until(带条件谓词版本)
函数签名:
template <class Clock, class Duration, class Predicate>
bool wait_until (std::unique_lock<std::mutex>& lck,const std::chrono::time_point<Clock, Duration>& abs_time,Predicate pred);
功能:
- 这个版本的
wait_until
除了等待直到 指定的绝对时间点外,还会在等待期间检查一个 条件谓词(Predicate
)。 - 只有当条件谓词返回
true
或者超时时,线程才会继续执行。 - 如果在超时之前条件满足,线程将继续执行并返回
true
,否则它会返回false
,表示条件未满足且超时。
参数:
lck
:传递给wait_until
的unique_lock
,线程需要拥有此锁。abs_time
:一个表示绝对时间点的chrono::time_point
,线程会等待直到该时间点,或者条件满足。pred
:一个条件谓词,只有当该条件为 true 时,线程才会继续执行。
返回值:
- 如果线程在
abs_time
时间点之前被唤醒并且条件满足,返回true
。 - 如果在时间到达前条件没有满足,并且超时,返回
false
。
代码示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>std::mutex mu;
std::condition_variable cond;
bool ready = false;void wait_until_time_with_predicate(int id, std::chrono::steady_clock::time_point abs_time) {std::unique_lock<std::mutex> lck(mu);// 等待直到绝对时间点或条件满足if (cond.wait_until(lck, abs_time, []() { return ready; })) {std::cout << "Thread " << id << " is ready!\n";} else {std::cout << "Thread " << id << " timed out.\n";}
}int main() {std::thread t1(wait_until_time_with_predicate, 1, std::chrono::steady_clock::now() + std::chrono::seconds(3));std::thread t2(wait_until_time_with_predicate, 2, std::chrono::steady_clock::now() + std::chrono::seconds(5));std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟延迟ready = true; // 设置条件为真,唤醒线程cond.notify_all(); // 唤醒所有等待的线程t1.join();t2.join();return 0;
}
解释:
- 这里,
wait_until
依然会等待到绝对时间点,但它不仅检查时间是否到了,还会检查ready
这个条件。 - 如果
ready
为 true 或者时间超时了,线程将继续执行。如果ready
在超时前未被设置为 true,则会输出 “timed out”。
关键区别总结
4. notify_one函数
函数原型:
void notify_one() noexcept;
功能:
notify_one()
是std::condition_variable
的一个成员函数,它用于 唤醒 一个 等待
在该条件变量上的线程。- 被唤醒的线程将继续执行并尝试重新获取锁。
- 该函数通常与
std::unique_lock<std::mutex>
和std::condition_variable::wait()
配合使用,在多个线程之间协调执行。
参数:
notify_one()
不需要任何参数。
返回值:
- 该函数没有返回值。
调用时机:
- 调用
notify_one()
时,它会通知一个在该条件变量上等待的线程,让它重新获取与条件变量相关联的锁,并继续执行。被唤醒的线程会根据条件继续执行,如果条件仍不满足,则可能会再次进入等待状态。
备注:
- 仅有一个线程会被唤醒,如果多个线程在等待该条件变量,只有一个线程会被唤醒。
- 如果没有线程等待在该条件变量上,
notify_one()
调用没有效果。 - 唤醒线程会在锁被释放后才能继续执行,因此条件变量确保唤醒线程在操作完成时可以再次获取锁。
常见使用场景:
- 生产者-消费者模式:在生产者线程生产一个项目后,可以使用
notify_one()
来唤醒一个等待消费的线程。 - 线程间通信:多个线程可能等待某个条件发生,一旦条件发生,可以使用
notify_one()
唤醒其中一个线程进行处理。
示例代码:
生产者消费者模式示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>// 全局队列用于存储生产的数据
std::queue<int> q;
// 用于同步的互斥锁
std::mutex mtx;
// 条件变量,用于线程间的同步
std::condition_variable cond_var;// 生产者线程函数,生产数据并将其放入队列
void producer() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产的延时std::lock_guard<std::mutex> lock(mtx); // 自动加锁,防止数据竞争q.push(i); // 将生产的数据放入队列std::cout << "Produced: " << i << std::endl; // 输出生产的信息cond_var.notify_one(); // 唤醒一个等待的消费者线程}
}// 消费者线程函数,消费队列中的数据
void consumer() {while (true) {std::unique_lock<std::mutex> lock(mtx); // 手动加锁,允许在wait时释放锁// 等待条件变量,直到队列中有数据cond_var.wait(lock, [](){ return !q.empty(); });// 从队列中取出数据并消费int value = q.front();q.pop();lock.unlock(); // 解锁,允许其他线程访问共享资源std::cout << "Consumed: " << value << std::endl; // 输出消费的信息}
}int main() {// 创建并启动生产者和消费者线程std::thread t1(producer); // 生产者线程std::thread t2(consumer); // 消费者线程// 等待两个线程完成t1.join();t2.join();return 0;
}
注释解释:
-
生产者线程:
- 使用
std::lock_guard<std::mutex>
来确保线程安全地向队列中添加数据。 - 每生产一个数据,调用
cond_var.notify_one()
唤醒一个等待的消费者线程。
- 使用
-
消费者线程:
- 使用
std::unique_lock<std::mutex>
来保证在等待条件变量时释放锁,防止死锁。 cond_var.wait(lock, [](){ return !q.empty(); })
阻塞当前线程,直到队列中有数据。- 当有数据时,消费并输出。
- 使用
-
主线程:
- 创建生产者和消费者线程并启动。
- 使用
join()
等待两个线程执行完成。
5. notify_all函数
函数原型:
void notify_all() noexcept;
解锁正在等待当前条件的所有线程,如果没有正在等待的线程,则函数不执行任何操作。
https://blog.csdn.net/li1615882553/article/details/86179781
功能:
notify_all()
是std::condition_variable
的一个成员函数,用于 唤醒 所有等待在该条件变量上的线程。- 该函数常用于当一个线程更改了共享资源的状态,并且希望所有等待该资源的线程重新开始执行时。
参数:
notify_all()
不接受任何参数。
返回值:
- 该函数没有返回值。
调用时机:
- 调用
notify_all()
时,所有等待在该条件变量上的线程都会被唤醒。 - 被唤醒的线程会尝试重新获取与条件变量相关联的锁,并根据条件继续执行。
备注:
- 如果有多个线程等待在该条件变量上,
notify_all()
会唤醒所有等待的线程,所有被唤醒的线程都会争夺锁,并继续执行。 - 唤醒线程会在锁被释放后才能继续执行,因此条件变量确保唤醒线程在操作完成时能够重新获取锁。
- 如果没有线程在等待条件变量上,调用
notify_all()
将没有任何效果。
常见使用场景:
- 生产者-消费者模式:在多线程的生产者-消费者问题中,如果一个线程的任务完成后,可能需要唤醒所有的消费者或生产者线程。
- 线程间的广播通知:在多个线程等待某个条件时,调用
notify_all()
可以确保所有线程都被通知,开始执行。
示例代码:
生产者-消费者模型:使用 notify_all()
唤醒所有等待线程
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>// 全局队列用于存储生产的数据
std::queue<int> q;
// 用于同步的互斥锁
std::mutex mtx;
// 条件变量,用于线程间的同步
std::condition_variable cond_var;// 生产者线程函数,生产数据并将其放入队列
void producer() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产的延时std::lock_guard<std::mutex> lock(mtx); // 自动加锁,防止数据竞争q.push(i); // 将生产的数据放入队列std::cout << "Produced: " << i << std::endl; // 输出生产的信息cond_var.notify_all(); // 唤醒所有等待的消费者线程}
}// 消费者线程函数,消费队列中的数据
void consumer() {while (true) {std::unique_lock<std::mutex> lock(mtx); // 手动加锁,允许在wait时释放锁// 等待条件变量,直到队列中有数据cond_var.wait(lock, [](){ return !q.empty(); });// 从队列中取出数据并消费int value = q.front();q.pop();lock.unlock(); // 解锁,允许其他线程访问共享资源std::cout << "Consumed: " << value << std::endl; // 输出消费的信息}
}int main() {// 创建并启动生产者和消费者线程std::thread t1(producer); // 生产者线程std::thread t2(consumer); // 消费者线程std::thread t3(consumer); // 另一个消费者线程// 等待线程完成t1.join();t2.join();t3.join();return 0;
}
注释解释:
-
生产者线程:
- 使用
std::lock_guard<std::mutex>
确保线程安全地向队列中添加数据。 - 每生产一个数据,调用
cond_var.notify_all()
唤醒所有等待的消费者线程。 - 唤醒所有消费者线程后,它们将争夺锁并继续执行消费操作。
- 使用
-
消费者线程:
- 使用
std::unique_lock<std::mutex>
来保证在等待条件变量时释放锁,防止死锁。 cond_var.wait(lock, [](){ return !q.empty(); })
阻塞当前线程,直到队列中有数据。- 当有数据时,消费并输出。
- 使用
-
主线程:
- 创建并启动生产者和多个消费者线程,并使用
join()
等待它们完成。
- 创建并启动生产者和多个消费者线程,并使用
notify_all()
和 notify_one()
的区别:
notify_one()
唤醒一个等待线程,通常用于一个线程执行任务后,唤醒一个处理者进行处理。notify_all()
唤醒所有等待线程,这在某些情况下很有用,比如多个线程等待相同的资源并且需要同时继续处理任务。
总结:
notify_all()
唤醒所有等待条件变量的线程,在需要唤醒多个线程时使用。例如,当一个任务完成后,希望所有线程能够一起继续执行。
1.3.2 范例
使用条件变量实现一个同步队列,同步队列作为一个线程安全的数据共享区,经常用于线程之间数据读取。
sync_queue.h
#ifndef SYNC_QUEUE_H
#define SYNC_QUEUE_H#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>template<typename T>
class SyncQueue
{
private:// 检查队列是否已满bool IsFull() const{return _queue.size() == _maxSize;}// 检查队列是否为空bool IsEmpty() const{return _queue.empty();}public:// 构造函数,初始化队列最大大小SyncQueue(int maxSize) : _maxSize(maxSize) {}// 放入数据到队列,如果队列已满则阻塞void Put(const T& x){// 锁住互斥量,确保对共享资源的安全访问std::lock_guard<std::mutex> locker(_mutex);// 如果队列满了,等待队列有空余空间while (IsFull()){std::cout << "full wait..." << std::endl;// 等待不满的条件变量_notFull.wait(_mutex);}// 将数据放入队列_queue.push_back(x);// 唤醒一个等待的消费者线程,表示队列不为空_notEmpty.notify_one();}// 从队列中取出数据,如果队列为空则阻塞void Take(T& x){// 锁住互斥量,确保对共享资源的安全访问std::lock_guard<std::mutex> locker(_mutex);// 如果队列为空,等待队列有数据while (IsEmpty()){std::cout << "empty wait.." << std::endl;// 等待非空的条件变量_notEmpty.wait(_mutex);}// 从队列头部取出数据x = _queue.front();_queue.pop_front();// 唤醒一个等待的生产者线程,表示队列有空余空间_notFull.notify_one();}// 判断队列是否为空bool Empty(){std::lock_guard<std::mutex> locker(_mutex);return _queue.empty();}// 判断队列是否已满bool Full(){std::lock_guard<std::mutex> locker(_mutex);return _queue.size() == _maxSize;}// 返回当前队列的大小size_t Size(){std::lock_guard<std::mutex> locker(_mutex);return _queue.size();}// 返回队列中元素的数量int Count(){return _queue.size();}private:std::list<T> _queue; // 队列本身,使用list来存储元素std::mutex _mutex; // 互斥量,确保对队列的安全访问std::condition_variable_any _notEmpty; // 不为空的条件变量,等待队列有数据std::condition_variable_any _notFull; // 不满的条件变量,等待队列有空位int _maxSize; // 队列的最大大小
};#endif // SYNC_QUEUE_H
main.cpp
#include <iostream>
#include "sync_queue.h"
#include <thread>
#include <mutex>using namespace std;// 创建一个最大大小为 5 的同步队列,用于存储整数
SyncQueue<int> syncQueue(5);// 生产者函数,将数据放入队列
void PutDatas()
{// 向队列中放入 20 个元素,每个元素的值是 888for (int i = 0; i < 20; ++i){syncQueue.Put(888); // 调用 Put 方法,放入数据}std::cout << "PutDatas finish\n"; // 打印出生产者任务完成的提示
}// 消费者函数,从队列中取出数据并打印
void TakeDatas()
{int x = 0; // 用于接收从队列取出的数据// 循环从队列中取出 20 个元素for (int i = 0; i < 20; ++i){syncQueue.Take(x); // 调用 Take 方法,取出数据std::cout << x << std::endl; // 打印取出的数据}std::cout << "TakeDatas finish\n"; // 打印出消费者任务完成的提示
}int main(void)
{// 创建并启动两个线程:// t1 作为生产者,向队列中放入数据// t2 作为消费者,从队列中取出数据std::thread t1(PutDatas);std::thread t2(TakeDatas);// 等待两个线程完成t1.join();t2.join();std::cout << "main finish\n"; // 打印出主线程任务完成的提示return 0; // 程序正常结束
}
- SyncQueue 类简介
SyncQueue 类实现了一个 线程安全的队列,它使用了 互斥锁 (std::mutex
) 和 条件变量 (std::condition_variable_any
) 来解决多线程访问队列时的同步问题,特别是用在 生产者-消费者模式 中。
核心功能:
- 生产者:将数据放入队列。当队列满时,生产者会等待直到有空间。
- 消费者:从队列取出数据。当队列空时,消费者会等待直到有数据。
- Put 和 Take 的实现
-
Put(生产者):
- 锁住队列。
- 如果队列满,生产者会等待直到有空余空间。
- 将数据放入队列,并通知消费者可以取数据。
-
Take(消费者):
- 锁住队列。
- 如果队列空,消费者会等待直到有数据。
- 从队列取出数据,并通知生产者可以放更多数据。
- 主函数中的操作
- 创建两个线程:
- 一个线程运行
PutDatas()
,负责将数据放入队列(模拟生产)。 - 另一个线程运行
TakeDatas()
,负责从队列中取数据并打印(模拟消费)。
- 一个线程运行
- 线程同步:
std::mutex
确保队列操作的互斥性。std::condition_variable_any
用于生产者和消费者之间的同步。
- 队列状态:
- 如果队列满,生产者阻塞;如果队列空,消费者阻塞。
- 总结
这个代码展示了一个典型的 生产者-消费者模型,通过互斥锁和条件变量保证线程安全,实现了 同步队列。生产者和消费者会相互等待,直到合适的时机进行数据操作。
代码中用到了std::lock_guard
,它利用RAII
机制可以保证安全释放mutex
。
std::lock_guard<std::mutex> locker(_mutex);while (IsFull()){std::cout << "full wait..." << std::endl;_notFull.wait(_mutex);}
可以改成
std::lock_guard<std::mutex> locker(_mutex);_notFull.wait(_mutex, [this] {return !IsFull();});
两种写法效果是一样的,但是后者更简洁,条件变量会先检查判断式是否满足条件,如果满足条件则重新获取mutex
,然后结束wait
继续往下执行;如果不满足条件则释放mutex
,然后将线程置为waiting
状态继续等待。
这里需要注意的是,wait
函数中会释放mutex
,而lock_guard
这时还拥有mutex
,它只会在出了作用域之后才会释放mutex
,所以这时它并不会释放,但执行wait
时会提取释放mutex
。
从语义上看这里使用lock_guard
会产生矛盾,但是实际上并不会出问题,因为wait
提前释放锁之后会处于等待状态,在被notify_one
或者notify_all
唤醒后会先获取mutex
,这相当于lock_guard
的mutex
在释放之后又获取到了,因此,在出了作用域之后lock_guard
自动释放mutex
不会有问题。
这里应该用unique_lock
,因为unique_lock
不像lock_guard
一样只能在析构时才释放锁,它可以随时释放锁,因此在wait
时让unique_lock
释放锁从语义上更加准确。
使用unique_lock
和condition_variable_variable
改写1-3-condition-sync-queue
,改写为用等待一个判断式的方法来实现一个简单的队列。
范例:1-3-condition-sync-queue2
#ifndef SIMPLE_SYNC_QUEUE_H
#define SIMPLE_SYNC_QUEUE_H#include <thread> // std::thread
#include <condition_variable> // std::condition_variable
#include <mutex> // std::mutex
#include <list> // std::list
#include <iostream> // std::cout// 定义一个线程安全的同步队列
template<typename T>
class SimpleSyncQueue
{
public:// 默认构造函数SimpleSyncQueue() {}// 向队列中添加元素void Put(const T& x){std::lock_guard<std::mutex> locker(_mutex); // 使用锁保护队列_queue.push_back(x); // 将数据放入队列_notEmpty.notify_one(); // 通知一个等待的消费者,表示队列不为空}// 从队列中取出一个元素void Take(T& x){std::unique_lock<std::mutex> locker(_mutex); // 使用独占锁,支持条件变量_notEmpty.wait(locker, [this]{ return !_queue.empty(); }); // 阻塞,直到队列非空x = _queue.front(); // 获取队列中的第一个元素_queue.pop_front(); // 从队列中移除该元素}// 检查队列是否为空bool Empty(){std::lock_guard<std::mutex> locker(_mutex); // 锁住队列,保证线程安全return _queue.empty(); // 返回队列是否为空}// 获取队列中元素的数量size_t Size(){std::lock_guard<std::mutex> locker(_mutex); // 锁住队列,避免多线程修改队列return _queue.size(); // 返回队列中的元素个数}private:std::list<T> _queue; // 存储数据的队列,使用 std::list 来实现std::mutex _mutex; // 互斥锁,确保线程安全std::condition_variable _notEmpty; // 条件变量,通知消费者队列不为空
};#endif // SIMPLE_SYNC_QUEUE_H
#include <iostream>
#include "sync_queue.h" // 包含同步队列的头文件
#include <thread> // 包含线程库
#include <mutex> // 包含互斥锁
using namespace std;// 创建一个线程安全的同步队列,存储 int 类型的数据
SimpleSyncQueue<int> syncQueue;// 生产者线程函数,往队列中添加数据
void PutDatas()
{// 往队列中放入 20 个数据,值为 888for (int i = 0; i < 20; ++i){syncQueue.Put(888); // 调用同步队列的 Put 方法插入数据}
}// 消费者线程函数,从队列中取出数据
void TakeDatas()
{int x = 0;// 从队列中取出 20 个数据并打印for (int i = 0; i < 20; ++i){syncQueue.Take(x); // 调用同步队列的 Take 方法取出数据std::cout << x << std::endl; // 打印取出的数据}
}// 主函数
int main(void)
{// 创建两个线程,分别调用 PutDatas 和 TakeDatasstd::thread t1(PutDatas); // 生产者线程std::thread t2(TakeDatas); // 消费者线程// 等待两个线程执行完毕t1.join();t2.join();// 打印主线程结束的信息std::cout << "main finish\n";return 0; // 返回 0,表示程序正常结束
}
SimpleSyncQueue
类通过 std::mutex
和 std::condition_variable
来保证线程安全,防止数据竞争。生产者和消费者线程通过条件变量来同步操作,确保不会在队列为空时取数据,也不会在队列满时再插入数据。
1.4 原子变量
具体参考:https://cplusplus.com/reference/atomic/atomic/
范例:1-4-atomic
#include <iostream> // 包含输入输出流库,用于打印结果
#include <atomic> // 包含原子操作库,用于使用 std::atomic
#include <thread> // 包含线程库,用于创建和管理线程// 定义一个全局原子变量 foo,初始化为 0
std::atomic<int> foo(0); // 原子整数类型,初始值为 0// 设置 foo 的值为给定的整数 x
void set_foo(int x)
{// 使用 store() 函数将 x 存储到 foo 中,使用 memory_order_relaxed 表示无同步要求foo.store(x, std::memory_order_relaxed);
}// 打印 foo 的值
void print_foo()
{int x;// 循环直到 foo 的值不是 0do {// 使用 load() 函数原子地读取 foo 的值,使用 memory_order_relaxed 表示无同步要求x = foo.load(std::memory_order_relaxed); } while (x == 0); // 如果值是 0,继续循环// 打印 foo 的最终值std::cout << "foo: " << x << '\n';
}int main()
{// 创建两个线程,一个打印 foo 的值,一个设置 foo 的值为 10std::thread first(print_foo); // 启动线程执行 print_foostd::thread second(set_foo, 10); // 启动线程执行 set_foo 并传入 10// 等待线程 first 和 second 执行完毕first.join();second.join();// 打印主线程完成的信息std::cout << "main finish\n";return 0; // 返回 0 表示程序正常结束
}
代码解析:
std::atomic<int> foo(0)
:
- 这是一个原子整数变量
foo
,它被初始化为 0。std::atomic
用于保证对 foo 的操作是线程安全的,不会发生数据竞争。
set_foo(int x)
:
- 该函数使用
foo.store(x, std::memory_order_relaxed)
设置 foo 的值为 x。store
是原子操作,它保证在多线程环境下对变量 foo 的赋值操作是安全的。 std::memory_order_relaxed
表示没有同步要求,即在不需要保证线程间严格同步的情况下执行操作。
print_foo()
:
- 该函数通过
foo.load(std::memory_order_relaxed)
原子地加载 foo 的值,并保存在变量 x 中。 do...while
循环会一直执行,直到 foo 的值不再是 0。- 这样做确保了程序在 foo 被赋值为非零值时才会停止循环并打印。
main()
:
- 在主函数中,我们启动了两个线程:
first
线程负责调用print_foo()
函数,它会等待直到 foo 的值不再是 0 并打印出来。second
线程负责调用set_foo(10)
函数,它会将 foo 的值设置为 10。
- 使用
first.join()
和second.join()
等待两个线程完成执行,确保主线程在两个线程执行完毕后才继续执行。
std::memory_order_relaxed
:
- 在此示例中,
std::memory_order_relaxed
被用于store
和load
操作。它表示不需要对线程间的操作顺序进行同步或约束,允许最大程度的性能优化。 - 这意味着其他线程可能在 foo 的值更新之前读取到 foo 的旧值(例如,在一个线程设置
foo = 10
后,另一个线程可能会读取到foo = 0
,直到 foo 更新为 10)。
执行流程:
main()
函数启动两个线程,first
线程执行print_foo()
,second
线程执行set_foo(10)
。first
线程会不断循环,直到它获取到 foo 的值不为 0。second
线程将foo
的值设置为 10,first 线程会读取到 10 并打印foo: 10
。- 最后,两个线程结束后,
main
函数打印main finish
。
1.5 call_once和once_flag使用
具体:https://www.apiref.com/cpp-zh/cpp/thread/call_once.html
在多线程中,有一种场景是某个任务只需要执行一次,可以用C++11中的std::call_once
函数配合std::once_flag
来实现。多个线程同时调用某个函数,std::call_once
可以保证多个线程对该函数只调用一次。
范例
#include <iostream> // 包含输入输出流库,用于打印结果
#include <thread> // 包含线程库,用于创建和管理线程
#include <mutex> // 包含互斥量库,用于同步操作// 定义两个 std::once_flag 变量,用于标记每个函数调用只执行一次
std::once_flag flag1, flag2;// 一个简单的函数,调用 std::call_once 仅执行一次操作
void simple_do_once()
{std::cout << "simple_do_once\n"; // 调用 std::call_once,确保输出 "Simple example: called once" 仅一次std::call_once(flag1, [](){std::cout << "Simple example: called once\n"; });
}// 一个函数,如果 `do_throw` 为真,则抛出异常
void may_throw_function(bool do_throw)
{if (do_throw) {std::cout << "throw: call_once will retry\n"; // 如果抛出异常,说明 std::call_once 会重试throw std::exception(); // 抛出异常}std::cout << "Didn't throw, call_once will not attempt again\n"; // 如果没有抛出异常,保证只调用一次
}// `do_once` 函数,尝试调用 `may_throw_function`,并处理异常
void do_once(bool do_throw)
{try {// std::call_once 确保 `may_throw_function` 仅调用一次std::call_once(flag2, may_throw_function, do_throw);}catch (...) {// 捕捉所有异常,确保不会传播异常}
}int main()
{// 创建四个线程,执行 simple_do_once,检查 std::call_once 的行为std::thread st1(simple_do_once);std::thread st2(simple_do_once);std::thread st3(simple_do_once);std::thread st4(simple_do_once);st1.join(); // 等待线程 st1 执行完st2.join(); // 等待线程 st2 执行完st3.join(); // 等待线程 st3 执行完st4.join(); // 等待线程 st4 执行完// 创建四个线程,执行 do_once,验证异常的情况std::thread t1(do_once, false); // do_once(false) 不抛出异常std::thread t2(do_once, false); // do_once(false) 不抛出异常std::thread t3(do_once, false); // do_once(false) 不抛出异常std::thread t4(do_once, true); // do_once(true) 抛出异常t1.join(); // 等待线程 t1 执行完t2.join(); // 等待线程 t2 执行完t3.join(); // 等待线程 t3 执行完t4.join(); // 等待线程 t4 执行完
}
代码分析与解释:
std::once_flag
:
std::once_flag
是一个专门用于控制某个操作只执行一次的标志变量。它通常与std::call_once
一起使用,保证某个函数在多个线程中只被执行一次。
std::call_once
:
std::call_once
是一个线程同步机制,它确保指定的函数只会执行一次。即使有多个线程调用std::call_once
,传递的函数也只会被执行一次。std::call_once
需要两个参数:- 一个
std::once_flag
类型的标志,表示是否已经执行过函数。 - 需要执行的函数或可调用对象。
- 一个
simple_do_once
:
- 该函数调用了
std::call_once(flag1, ...)
,通过flag1
来保证std::cout << "Simple example: called once\n"
; 只会打印一次。 - 尽管我们有多个线程(st1, st2, st3, st4),但输出
Simple example: called once
只会出现一次,因为 std::call_once 确保了这段代码只会执行一次。
may_throw_function
:
- 这是一个会根据
do_throw
参数决定是否抛出异常的函数。 - 如果
do_throw
为 true,它会抛出异常,告诉调用者std::call_once
会重新尝试执行。 - 如果没有抛出异常,
std::call_once
只会执行一次,输出相应的消息。
do_once
:
- 该函数调用
std::call_once(flag2, ...)
,确保may_throw_function
仅在多个线程中执行一次。 - 如果
may_throw_function
抛出异常,std::call_once
会再次尝试执行,但它仅会执行一次。异常会被捕捉,并且不会影响其他线程的执行。
main
函数:
- main 中创建了多个线程:
- 四个线程执行
simple_do_once
,这些线程都会尝试调用std::call_once
,但Simple example: called once
只会打印一次。 - 四个线程执行
do_once
,其中三个线程传入 false(不会抛出异常),一个线程传入 true(会抛出异常)。无论如何,may_throw_function
都只会执行一次。
- 四个线程执行
代码的执行流程:
simple_do_once
函数:
- 创建了四个线程 st1, st2, st3, st4,它们都会调用
std::call_once(flag1, ...)
,但是std::cout << "Simple example: called once\n";
只会输出一次。
do_once
函数:
- 创建了四个线程 t1, t2, t3, t4,其中 t4 会抛出异常(
do_throw=true
),其他线程不会抛出异常。 - 无论是否抛出异常,
std::call_once
确保may_throw_function
只会被执行一次,即使有多个线程尝试调用。
输出的结果:
simple_do_once
simple_do_once
simple_do_once
simple_do_once
Simple example: called once
throw: call_once will retry
Didn't throw, call_once will not attempt again
Didn't throw, call_once will not attempt again
Didn't throw, call_once will not attempt again
1.6 异步操作
std::future
std::aysnc
std::promise
std::packaged_task
参考C++官方手册的范例。
1.6.1 std::future
std::future
期待一个返回,从一个异步调用的角度来说,future更像是执行函数的返回值,C++标准库使用std::future
为一次性事件建模,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个future
对象来代表这个事件。
异步调用往往不知道何时返回,但是如果异步调用的过程需要同步,或者说后一个异步调用需要使用前一个异步调用的结果。这个时候就要用到future
。
线程可以周期性的在这个future
上等待一小段时间,检查future
是否已经ready
,如果没有,该线程可以先去做另一个任务,一旦future
就绪,该future
就无法复位(无法再次使用这个future
等待这个事件),所以future
代表的是一次性事件。
future
的类型
在库的头文件中声明了两种future
,唯一future
(std::future
)和共享future
(std::shared_future
)这两个是参照std::unique_ptr
和std::shared_ptr
设立的,前者的实例是仅有的一个指向其关联事件的实例,而后者可以有多个实例指向同一个关联事件,当事件就绪时,所有指向同一事件的std::shared_future
实例会变成就绪。
future的使用
std::future
是一个模板,例如std::future
,模板参数就是期待返回的类型,虽然future被用于线程间通信,但其本身却并不提供同步访问,热门必须通过互斥元或其他同步机制来保护访问。
future使用的时机是当你不需要立刻得到一个结果的时候,你可以开启一个线程帮你去做一项任务,并期待这个任务的返回,但是std::thread
并没有提供这样的机制,这就需要用到std::async
和std::future
(都在头文件中声明)
std::async
返回一个std::future
对象,而不是给你一个确定的值(所以当你不需要立刻使用此值的时候才需要用到这个机制)。当你需要使用这个值的时候,对future
使用get()
,线程就会阻塞直到future
就绪,然后返回该值。
#include <iostream> // 引入输入输出库
#include <future> // 引入future和async库,用于异步编程
#include <thread> // 引入线程库,用于模拟多线程using namespace std;// 模拟一个简单的计算任务,延迟5秒后返回 1 + 1
int find_result_to_add()
{// 使用std::this_thread::sleep_for模拟计算过程中的延迟std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟异步延迟的影响return 1 + 1; // 返回计算结果
}// 模拟一个带参数的计算任务,延迟5秒后返回 a + b
int find_result_to_add2(int a, int b)
{// 使用std::this_thread::sleep_for模拟计算过程中的延迟std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟异步延迟的影响return a + b; // 返回计算结果
}// 一个简单的函数,用于执行其他事情
void do_other_things()
{std::cout << "Hello World" << std::endl; // 打印 Hello World// 模拟一些延迟操作std::this_thread::sleep_for(std::chrono::seconds(5)); // 使用sleep模拟其他操作的延迟
}int main()
{// 使用std::async启动一个异步任务,执行 find_result_to_add 函数std::future<int> result = std::async(find_result_to_add);// 另一种方式,使用decltype推断返回值类型并启动异步任务std::future<decltype(find_result_to_add())> result = std::async(find_result_to_add);// 执行一些其他任务,这里模拟主线程做其他工作do_other_things(); // 模拟主线程的其他任务// 调用 get() 来获取异步任务的结果,注意这里会阻塞直到任务完成std::cout << "result: " << result.get() << std::endl; // 获取异步任务的返回结果,并输出// 异步任务的延迟会影响 get() 的返回,直到任务完成// 使用std::async启动另一个异步任务,执行带有参数的 find_result_to_add2 函数// 注意在这里需要确保传递正确的参数类型,不能直接调用函数原型类型std::future<decltype(find_result_to_add2(0, 0))> result2 = std::async(find_result_to_add2, 10, 20); // 正确的方式,传递具体参数std::cout << "result2: " << result2.get() << std::endl; // 获取异步任务的结果,并输出std::cout << "main finish" << endl; // 输出主线程结束的标志return 0; // 返回0,表示程序结束
}
代码讲解:
std::future
和std::async
:
std::future
是用来获取异步任务结果的一个类,它代表一个未来的结果。你可以在主线程中调用future.get()
来等待和获取异步任务的结果。std::async
用来启动一个异步任务,它接受一个函数和可选的参数,并返回一个std::future
对象。任务会在后台线程中执行。
find_result_to_add
函数:
- 这个函数模拟了一个简单的计算任务,返回 1 + 1。为了模拟计算过程中的延迟,它使用了
std::this_thread::sleep_for
来让线程休眠 5 秒。
find_result_to_add2
函数:
- 这是一个带有参数的计算任务,接收两个整数并返回它们的和。它同样使用
std::this_thread::sleep_for
来模拟计算的延迟。
do_other_things
函数:
- 该函数只是简单地打印 “Hello World”,并休眠 5 秒来模拟一些其他操作。这是为了测试在异步任务运行时,主线程是否可以同时做其他事情。
- main 函数:
- 在 main 函数中,首先启动了一个异步任务
result
,它会执行find_result_to_add
函数。 - 之后,调用了
do_other_things
函数,这模拟了主线程在等待异步任务结果期间可以进行其他操作。 - 使用
result.get()
来等待find_result_to_add
函数的结果,get()
会阻塞当前线程,直到异步任务完成。 - 接下来,启动了另一个异步任务
result2
,执行find_result_to_add2
函数,传递了两个参数(10 和 20),并使用result2.get()
获取其返回结果。 - 最后,打印主线程结束的信息。
代码的输出:
do_other_things
会在异步任务执行的同时进行,它会打印 “Hello World” 然后休眠 5 秒。result.get()
会阻塞主线程,直到find_result_to_add
异步任务完成并返回结果。因此,result 的结果会在大约 5 秒后输出:result: 2
。result2.get()
同样会阻塞主线程,直到find_result_to_add2
异步任务完成并返回结果。result2 的结果会在大约 5 秒后输出:result2: 30。
主要结论:
- 异步任务通过
std::async
启动,并且通过std::future
获取结果。 std::future::get()
会阻塞当前线程,直到异步任务完成。- 异步任务和主线程可以并发执行,因此主线程可以在等待异步任务的过程中进行其他工作(例如
do_other_things
)。
重点
跟thread
类似,async
允许你通过将额外的参数添加到调用中,来将附加参数传递给函数。如果传入的函数指针是某个类的成员函数,则还需要将类对象指针传入(直接传入,传入指针,或者是std::ref
封装)。
默认情况下,std::async
是否启动一个新线程,或者在等待future
时,任务是否同步运行都取决于你给的参数。这个参数为std::launch
类型
std::launch::defered
表明该函数会被延迟调用,直到在future上调用get()或者wait()为止std::launch::async
,表明函数会在自己创建的线程上运行std::launch::any = std::launch::defered | std::launch::async
std::launch::sync = std::launch::defered
enum class launch{async,deferred,sync=deferred,any=async|deferred};
1.6.2 std::packaged_task
如果说std::async
和std::feature
还是分开看的关系的话,那么std::packaged_task
就是将任务和feature
绑定在一起的模板,是一种封装对任务的封装。
可以通过std::packaged_task
对象获取任务相关联的feature
,调用get_future()
方法可以获得std::packaged_task
对象绑定的函数的返回值类型的future
。std::packaged_task
的模板参数是函数签名。
PS:例如int add(int a, intb)
的函数签名就是int(int, int)
// 包含头文件
#include <iostream> // 用于输入输出操作
#include <future> // 用于std::future和std::packaged_taskusing namespace std; // 使用标准命名空间// 定义一个简单的加法函数
int add(int a, int b)
{return a + b; // 返回a和b的和
}// 定义一个模拟其他任务的函数
void do_other_things()
{std::cout << "Hello World" << std::endl; // 打印"Hello World"
}int main()
{// 创建一个 std::packaged_task 对象,用来包装一个返回int类型的函数// 这个task对象会包装add函数,接受两个int类型的参数并返回intstd::packaged_task<int(int, int)> task(add);// 调用do_other_things函数,模拟在后台执行其他操作do_other_things(); // 输出"Hello World"// 获取一个与task相关联的future对象,用于获取任务的结果std::future<int> result = task.get_future();// 执行任务。调用task(1, 1)时,它会执行add(1, 1),并将结果(2)传递给关联的future。task(1, 1); // 执行add函数,计算1+1,任务开始执行// 获取并打印任务执行的结果。get()会阻塞,直到task执行完毕并返回结果。std::cout << result.get() << std::endl; // 打印任务执行结果:2return 0; // 程序结束
}
代码解析:
std::packaged_task
: 这是一个模板类,用于包装一个函数,使得可以通过std::future
获取该函数的结果。在这个例子中,task
包装了add
函数,它接受两个参数并返回一个int。do_other_things()
: 这个函数模拟其他工作,它在task开始执行之前被调用,用来展示在调用任务之前可以执行其他操作。task.get_future()
: 这行代码获取一个std::future<int>
对象,它用来接收任务的执行结果。task(1, 1)
: 这行代码触发任务执行,计算add(1, 1)
,并将结果(2)传递给std::future
。result.get()
: 获取并打印任务的执行结果。在任务执行完成后,get()
返回任务的结果。
注意:
std::packaged_task
必须执行,才能将结果提供给std::future
。否则,调用get()
时将一直阻塞,等待结果。
1.6.3 std::promise
从字面意思上理解promise
代表一个承诺。promise
比std::packaged_task
抽象层次低。
std::promise
提供了一种设置值的方式,它可以在这之后通过相关联的std::future
对象进行读取。换种说法,之前已经说过std::future
可以读取一个异步函数的返回值了,那么这个std::promise
就提供一种方式手动让future
就绪。
// 包含头文件
#include <future> // 用于 std::promise 和 std::future
#include <string> // 用于 std::string 类型
#include <thread> // 用于 std::thread,支持多线程操作
#include <iostream> // 用于输入输出操作using namespace std; // 使用标准命名空间// 定义一个函数,接受std::promise类型的引用参数
// 在该函数中通过promise对象设置结果
void print(std::promise<std::string>& p)
{// 设置promise的值,表示任务的结果p.set_value("There is the result whitch you want.");
}// 定义一个模拟其他任务的函数
void do_some_other_things()
{std::cout << "Hello World" << std::endl; // 输出"Hello World"
}int main()
{// 创建一个std::promise对象,用于传递字符串类型的结果std::promise<std::string> promise;// 获取与promise相关联的std::future对象,用于接收异步任务的结果std::future<std::string> result = promise.get_future();// 启动一个线程执行print函数,传递promise对象的引用std::thread t(print, std::ref(promise));// 模拟执行其他任务do_some_other_things(); // 输出"Hello World"// 获取并打印由print函数设置的promise值std::cout << result.get() << std::endl; // 打印:"There is the result whitch you want."// 等待线程执行完毕,确保主线程在结束前等待子线程t.join();return 0; // 程序结束
}
由此可以看出在promise
创建好的时候future
也已经创建好了 线程在创建promise
的同时会获得一个future
,然后将promise
传递给设置他的线程,当前线程则持有future
,以便随时检查是否可以取值。
执行流程:
- 程序创建了一个
std::promise<std::string>
对象promise
,并通过promise.get_future()
获得一个std::future<std::string>
对象result
,该对象用于接收异步任务的结果。 - 启动一个新线程
std::thread t(print, std::ref(promise))
,该线程将调用print
函数并传递promise
的引用。 - 在主线程中,调用
do_some_other_things
函数来模拟其他任务(输出“Hello World”)。 - 主线程通过
result.get()
阻塞等待promise
设置的值,并打印任务的结果。 - 最后,调用
t.join()
来确保主线程在退出前等待子线程完成。
1.6.4 总结
future
的表现为期望,当前线程持有future
时,期望从future
获取到想要的结果和返回,可以把future
当做异步函数的返回值。而promise
是一个承诺,当线程创建了promise
对象后,这个promise
对象向线程承诺他必定会被人设置一个值,和promise
相关联的future
就是获取其返回的手段。
2 function和bind用法
在设计回调函数的时候,无可避免地会接触到可回调对象。在C++11中,提供了std::function
和std::bind
两个方法来对可回调对象进行统一和封装。
C++语言中有几种可调用对象:函数、函数指针、lambda表达式、bind创建的对象以及重载了函数调用运算符的类。
和其他对象一样,可调用对象也有类型。例如,每个lambda有它自己唯一的(未命名)类类型;函数及函数指针的类型则由其返回值类型和实参类型决定。
2.1 function的用法
- 保存普通函数
void printA(int a){cout << a << endl;
}std::function<void(int a)> func;func = printA;func(2);
//2
- 保存lambda表达式
std::function<void()> func_1 = [](){cout << "hello world" << endl;};
func_1(); //hello world
- 保存成员函数
class Foo{Foo(int num) : num_(num){}void print_add(int i) const {cout << num_ + i << endl;}int num_;
};//保存成员函数std::function<void(const Foo&,int)> f_add_display = &Foo::print_add;Foo foo(2);f_add_display(foo,1);
2.2 bind用法
可将bind函数看作是一个通用的函数适配器,它接受一个可调用对象,生成一个新的可调用对象来“适应”原对象的参数列表。
调用bind的一般形式:auto newCallable = bind(callable,arg_list)
;
其中,newCallable
本身是一个可调用对象,arg_list
是一个逗号分隔的参数列表,对应给定的callable
的参数。即,当我们调用newCallable
时,newCallable
会调用callable
,并传给它arg_list
中的参数。
arg_list
中的参数可能包含形如n的名字,其中n是一个整数,这些参数是“占位符”,表示newCallable
的参数,它们占据了传递给newCallable
的参数的“位置”。数值n表示生成的可调用对象中参数的位置:1
为newCallable的第一个参数,_2
为第二个参数,以此类推。
范例:2-2-bind
// 包含必要的头文件
#include <iostream> // 用于输入输出操作
#include <functional> // 用于 std::bind 和 std::placeholdersusing namespace std; // 使用标准命名空间// 定义类 A,包含一个成员函数 fun_3
class A
{
public:// 成员函数 fun_3,打印两个参数 k 和 mvoid fun_3(int k, int m){cout << "print: k=" << k << ",m=" << m << endl;}
};// 定义一个普通函数 fun_1,接受三个 int 参数
void fun_1(int x, int y, int z)
{cout << "print: x=" << x << ",y=" << y << ",z=" << z << endl;
}// 定义一个普通函数 fun_2,接受两个 int 引用参数
void fun_2(int &a, int &b)
{a++; // 对第一个参数进行自增b++; // 对第二个参数进行自增cout << "print: a=" << a << ",b=" << b << endl; // 打印 a 和 b 的值
}int main(int argc, char *argv[])
{// 使用 std::bind 绑定 fun_1 函数的三个参数// f1 的类型是 function<void(int, int, int)>auto f1 = std::bind(fun_1, 1, 2, 3); // 绑定 fun_1 的三个参数为 1, 2, 3f1(); // 调用 f1() 输出:print: x=1,y=2,z=3// 绑定 fun_1 函数的第三个参数为 3,前两个参数由调用 f2 时传入auto f2 = std::bind(fun_1, placeholders::_1, placeholders::_2, 3);// _1 和 _2 表示占位符,调用时会传入参数来替代f2(1, 2); // 调用 f2(1, 2),输出:print: x=1,y=2,z=3// 绑定 fun_1 函数的第三个参数为 3,前两个参数的顺序交换auto f3 = std::bind(fun_1, placeholders::_2, placeholders::_1, 3);f3(1, 2); // 调用 f3(1, 2),输出:print: x=2,y=1,z=3// 注意:f2 和 f3 的区别,f2 传递的是 1 和 2,而 f3 交换了 1 和 2 的顺序// 定义 int 类型的变量 m 和 nint m = 2;int n = 3;// 绑定 fun_2 函数的第二个参数为 n, 第一个参数通过调用 f4 时传入auto f4 = std::bind(fun_2, placeholders::_1, n);f4(m); // 调用 f4(m),输出:print: a=3,b=4cout << "m=" << m << endl; // 输出 m=3,说明 m 是通过引用传递的cout << "n=" << n << endl; // 输出 n=3,说明 n 是通过值传递的// 创建类 A 的实例 aA a;// 绑定类 A 的成员函数 fun_3,a 为实例对象,使用占位符来传递参数auto f5 = std::bind(&A::fun_3, a, placeholders::_1, placeholders::_2);f5(10, 20); // 调用 f5(10, 20),输出:print: k=10,m=20// 使用 std::function 显式指定函数类型来绑定 A 类的成员函数std::function<void(int, int)> fc = std::bind(&A::fun_3, a, std::placeholders::_1, std::placeholders::_2);fc(10, 20); // 调用 fc(10, 20),输出:print: k=10,m=20return 0;
}
代码解析:
std::bind
:
std::bind 是一个函数适配器,它可以将一个可调用对象(如普通函数或成员函数)与一组参数进行绑定。这样你就可以创建一个新的可调用对象(如函数、lambda 表达式等),并将部分参数固定下来,剩下的参数通过后续调用传递。
std::placeholders::_1
、std::placeholders::_2
等占位符用于指定将来调用时需要传递的参数的位置。std::bind
返回一个可调用对象,你可以像调用普通函数一样调用这个对象。
- 示例 1:绑定函数参数
auto f1 = std::bind(fun_1, 1, 2, 3);
f1(); // 打印:print: x=1,y=2,z=3
- f1 是 fun_1 函数的绑定版本,所有参数都提前绑定好了。在调用 f1() 时,自动传递 1、2、3 作为参数。
- 示例 2:占位符
auto f2 = std::bind(fun_1, placeholders::_1, placeholders::_2, 3);
f2(1, 2); // 打印:print: x=1,y=2,z=3
- f2 是一个函数版本,第三个参数固定为 3,而前两个参数由调用 f2 时传入的值代替。
- 示例 3:改变参数顺序
auto f3 = std::bind(fun_1, placeholders::_2, placeholders::_1, 3);
f3(1, 2); // 打印:print: x=2,y=1,z=3
- f3 使用了占位符,并交换了前两个参数的位置。
- 示例 4:通过引用传递参数
auto f4 = std::bind(fun_2, placeholders::_1, n);
f4(m); // 打印:print: a=3,b=4
- f4 绑定了 fun_2 函数,第二个参数 n 是固定的,而第一个参数由调用时传入的 m 来代替。注意,由于 fun_2 的参数是引用类型,m 在调用过程中会被修改。
- 示例 5:绑定成员函数
auto f5 = std::bind(&A::fun_3, a, placeholders::_1, placeholders::_2);
f5(10, 20); // 打印:print: k=10,m=20
- f5 绑定了类 A 的成员函数 fun_3,并通过占位符指定参数的位置。a 是类的实例,它被传递给 fun_3 函数。
- std::function 和 std::bind
std::function<void(int, int)> fc = std::bind(&A::fun_3, a, std::placeholders::_1, std::placeholders::_2);
fc(10, 20); // 打印:print: k=10,m=20
C++11的新特性–可变模版参数(variadic templates)是C++11新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
3.1 可变模版参数的展开
可变参数模板语法
template <class... T>void f(T... args);
上面的可变模版参数的定义当中,省略号的作用有两个:
- 声明一个参数包T… args,这个参数包中可以包含0到任意个模板参数;
- 在模板定义的右边,可以将参数包展开成一个一个独立的参数。
上面的参数args前面有省略号,所以它就是一个可变模版参数,我们把带省略号的参数称为“参数包”,它里面包含了0到N(N>=0)个模版参数。我们无法直接获取参数包args中的每个参数的,只能通过展开参数包的方式来获取参数包中的每个参数,这是使用可变模版参数的一个主要特点,也是最大的难点,即如何展开可变模版参数。
可变模版参数和普通的模版参数语义是一致的,所以可以应用于函数和类,即可变模版参数函数和可变模版参数类,然而,模版函数不支持偏特化,所以可变模版参数函数和可变模版参数类展开可变模版参数的方法还不尽相同,下面我们来分别看看他们展开可变模版参数的方法。
3.1.1 可变模版参数函数
// 3-2-variable-parameter 一个简单的可变模版参数函数#include <iostream> // 用于输入输出操作using namespace std; // 使用标准命名空间// 定义一个接受可变参数的模板函数
template <class... T>
void f(T... args)
{// sizeof...(args) 返回可变参数的数量cout << sizeof...(args) << endl;
}int main()
{f(); // 调用 f 函数,传入 0 个参数,输出 0// 输出:0f(1, 2); // 调用 f 函数,传入 2 个参数,输出 2// 输出:2f(1, 2.5, ""); // 调用 f 函数,传入 3 个参数,输出 3// 输出:3return 0; // 程序结束
}
上面的例子中,f()没有传入参数,所以参数包为空,输出的size为0,后面两次调用分别传入两个和三个参数,故输出的size分别为2和3。由于可变模版参数的类型和个数是不固定的,所以我们可以传任意类型和个数的参数给函数f。这个例子只是简单的将可变模版参数的个数打印出来,如果我们需要将参数包中的每个参数打印出来的话就需要通过一些方法了。
展开可变模版参数函数的方法一般有两种:
- 通过递归函数来展开参数包,
- 是通过逗号表达式来展开参数包。
下面来看看如何用这两种方法来展开参数包。
递归函数方式展开参数包
通过递归函数展开参数包,需要提供一个参数包展开的函数和一个递归终止函数,递归终止函数正是用来终止递归的
// 3-1-variable-parameter2 递归函数方式展开参数包#include <iostream> // 用于输入输出操作using namespace std; // 使用标准命名空间// 递归终止函数,当没有参数时调用该函数
void print()
{cout << "empty" << endl; // 输出 "empty"
}// 展开函数,接受一个参数和可变参数包
template <class T, class ...Args>
void print(T head, Args... rest)
{cout << "parameter " << head << endl; // 打印当前传递的参数print(rest...); // 递归调用,展开参数包
}int main(void)
{// 调用 print 函数,传入 4 个参数print(1, 2, 3, 4); // 递归展开,逐个输出参数return 0; // 程序结束
}
代码解析:
- 递归终止函数
print()
:
- 这是递归展开的基准函数。当参数包为空时,调用此函数。它只是输出 “empty”,表明参数包已经被展开完毕。
- 展开函数
print(T head, Args... rest)
:
- 这是一个递归模板函数,它接受一个参数 head 和一个可变参数包 rest。
- head 是参数包中的第一个元素,rest… 是剩余的参数包。
- 每次调用
print(rest...)
时,都会将 head 从参数包中分离出来并打印,然后递归调用 print,直到参数包为空,最后调用终止函数print()
。
main()
函数:
print(1, 2, 3, 4)
调用展开函数,传递四个参数。- 程序会依次展开参数包:
- 第一次调用:
print(1, 2, 3, 4)
,输出 parameter 1,然后调用 print(2, 3, 4) - 第二次调用:
print(2, 3, 4)
,输出 parameter 2,然后调用 print(3, 4) - 第三次调用:
print(3, 4)
,输出 parameter 3,然后调用 print(4) - 第四次调用:
print(4)
,输出 parameter 4,然后调用 print() - 最后,调用基准函数 print(),输出 “empty”。
- 第一次调用:
输出:
parameter 1
parameter 2
parameter 3
parameter 4
empty
递归展开:
- 在此代码中,参数包通过递归展开,每次调用时从参数包中提取一个参数(head),然后递归调用 print 处理剩下的参数(rest…)。
- 递归结束时,调用 print() 终止函数,并输出 “empty”。
上面的递归终止函数还可以写成这样:
template <class T>void print(T t){cout << t << endl;}
逗号表达式展开参数包
递归函数展开参数包是一种标准做法,也比较好理解,但也有一个缺点,就是必须要一个重载的递归终止函数,即必须要有一个同名的终止函数来终止递归,这样可能会感觉稍有不便。有没有一种更简单的方式呢?其实还有一种方法可以不通过递归方式来展开参数包,这种方式需要借助逗号表达式和初始化列表。比如前面print的例子可以改成这样:
// 3-1-variable-parameter3 逗号表达式展开参数包#include <iostream> // 用于输入输出操作using namespace std; // 使用标准命名空间// 打印单个参数的函数模板
template <class T>
void printarg(T t)
{cout << t << endl; // 输出参数 t
}// 展开参数包的函数模板
template <class ...Args>
void expand(Args... args)
{// 使用逗号表达式展开参数包int arr[] = {(printarg(args), 0)...}; // 对每个参数调用 printarg,逗号表达式用于展开
}int main()
{// 调用 expand 函数,传递多个参数expand(1, 2, 3, 4); // 输出每个参数return 0; // 程序结束
}
代码解析:
printarg
函数模板:
printarg
是一个接受单个参数并打印该参数的模板函数。它用于打印传入的每个参数。- 在
expand
函数中,printarg
被用于打印每一个参数。
expand
函数模板:
expand
是一个接受可变参数的模板函数。通过逗号表达式(printarg(args), 0)
展开参数包 args。- 逗号表达式用于按顺序处理每个参数,并返回一个值。在这里,
printarg(args)
被调用打印参数,0 是逗号表达式的结果,但它不会影响程序执行。 - 逗号表达式的作用是依次对每个参数进行操作,展开参数包。
main
函数:
expand(1, 2, 3, 4)
调用 expand 函数,传递四个参数。- expand 内部使用逗号表达式展开参数包,依次调用 printarg 打印每个参数。
执行流程:
expand(1, 2, 3, 4)
被调用时,args 被展开为 (1, 2, 3, 4)。int arr[] = {(printarg(args), 0)...};
使用逗号表达式逐个打印参数:
- printarg(1) 被调用,输出 1。
- printarg(2) 被调用,输出 2。
- printarg(3) 被调用,输出 3。
- printarg(4) 被调用,输出 4。
输出:
1
2
3
4
逗号表达式:
- 逗号表达式
(printarg(args), 0)
用于在一个表达式中执行多个操作。每个printarg(args)
打印一个参数,而 0 是逗号表达式的结果,它不会影响程序的执行流。 - 通过这种方式,参数包 args 被逐个展开,并对每个元素执行 printarg 操作。
expand
函数中的逗号表达式:(printarg(args), 0)
,先执行printarg(args)
,再得到逗号表达式的结果0。同时还用到了C++11的另外一个特性——初始化列表,通过初始化列表来初始化一个变长数组, {(printarg(args), 0)...}
将会展开成((printarg(arg1),0), (printarg(arg2),0),(printarg(arg3),0), etc... )
,最终会创建一个元素值都为0的数组int arr[sizeof...(Args)]
。由于是逗号表达式,在创建数组的过程中会先执行逗号表达式前面的部分printarg(args)
打印出参数,也就是说在构造int数组的过程中就将参数包展开了,这个数组的目的纯粹是为了在数组构造的过程展开参数包。我们可以把上面的例子再进一步改进一下,将函数作为参数,就可以支持lambda表达式了,从而可以少写一个递归终止函数了,具体代码如
下:
// 3-1-variable-parameter4#include <iostream> // 用于输入输出操作
#include <initializer_list> // 用于 std::initializer_listusing namespace std; // 使用标准命名空间// 展开参数包的函数模板,使用完美转发
template<class F, class... Args>
void expand(const F& f, Args&&... args)
{// 使用完美转发将参数包展开并传递给 finitializer_list<int>{(f(std::forward<Args>(args)), 0)...}; // 逗号表达式展开参数包
}int main()
{// 调用 expand 函数,传递一个 lambda 函数和多个参数expand([](int i) { cout << i << endl; }, 1, 2, 3);return 0; // 程序结束
}
expand
是一个接受一个可调用对象 f 和可变参数 args… 的模板函数。initializer_list<int>{(f(std::forward<Args>(args)), 0)...};
展开参数包并对每个参数调用 f。- 这里使用了完美转发 (
std::forward<Args>(args)
) 来保证参数的值类别(左值或右值)保持不变。 - 使用逗号表达式
(f(std::forward<Args>(args)), 0)
,在调用 f 的同时返回 0。0 是逗号表达式的结果,但它并不影响程序的执行。
4 实现C++线程池
重点
- 可变参数
- std::future
- decltype
- packaged_task
- bind
- 支持可变参数列表
- 支持获取任务返回值
范例4-threadpool
zero_threadpool.h
解析https://blog.csdn.net/ke_wu/article/details/144859874?sharetype=blogdetail&sharerId=144859874&sharerefer=PC&sharesource=ke_wu&spm=1011.2480.3001.8118
#ifndef ZERO_THREADPOOL_H
#define ZERO_THREADPOOL_H#include <future> // 用于future相关操作
#include <functional> // 用于std::bind和function
#include <iostream> // 用于输入输出
#include <queue> // 用于任务队列
#include <mutex> // 用于锁
#include <memory> // 用于智能指针
#ifdef WIN32
#include <windows.h> // Windows平台的头文件
#else
#include <sys/time.h> // Linux平台的时间相关头文件
#endifusing namespace std;// 获取当前时间,填充到timeval结构体中
void getNow(timeval *tv);// 获取当前时间的毫秒数
int64_t getNowMs();// 宏定义用于当前时间
#define TNOW
#define TNOWMS // 获取当前时间
getNow()
// 获取当前时间的毫秒数
getNowMs()////*** @file zero_thread_pool.h* @brief 线程池类, 采用C++11实现** 使用说明:* ZERO_ThreadPool tpool;* tpool.init(5); // 初始化线程池线程数* tpool.start(); // 启动线程池* tpool.exec(testFunction, 10); // 将任务丢到线程池中* tpool.waitForAllDone(1000); // 等待线程池结束,超时1秒* tpool.stop(); // 停止线程池* * 返回值示例:* auto f = tpool.exec(testInt, 5);* cout << f.get() << endl; // 当testInt在线程池中执行后, f.get()会返回数值5*/
class ZERO_ThreadPool
{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime) : _expireTime(expireTime) {}std::function<void()> _func; // 任务的实际函数int64_t _expireTime = 0; // 任务超时时间};typedef shared_ptr<TaskFunc> TaskFuncPtr;public:/*** @brief 构造函数*/ZERO_ThreadPool();/*** @brief 析构函数,停止所有线程*/virtual ~ZERO_ThreadPool();/*** @brief 初始化线程池* @param num 工作线程个数* @return 是否初始化成功*/bool init(size_t num);/*** @brief 获取当前线程池的线程个数* @return 线程个数*/size_t getThreadNum(){std::unique_lock<std::mutex> lock(_mutex);return _threads.size();}/*** @brief 获取当前线程池的任务数* @return 任务数*/size_t getJobNum(){std::unique_lock<std::mutex> lock(_mutex);return _tasks.size();}/*** @brief 停止所有线程,并等待线程结束*/void stop();/*** @brief 启动所有线程* @return 是否成功启动*/bool start();/*** @brief 用线程池执行任务* @param f 任务函数* @param args 任务函数参数* @return 返回任务的future对象*/template <class F, class... Args>auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>{return exec(0, f, args...);}/*** @brief 用线程池执行带有超时的任务* @param timeoutMs 超时时间(毫秒)* @param f 任务函数* @param args 任务函数参数* @return 返回任务的future对象*/template <class F, class... Args>auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>{int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取当前时间// 推导返回值类型using RetType = decltype(f(args...));// 封装任务auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 创建任务指针并设置超时时间TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);fPtr->_func = [task]() { (*task)(); }; // 定义任务执行时的具体行为// 加锁并将任务加入任务队列std::unique_lock<std::mutex> lock(_mutex);_tasks.push(fPtr);_condition.notify_one(); // 唤醒等待线程return task->get_future();}/*** @brief 等待所有任务完成* @param millsecond 等待的时间(毫秒),-1表示无限等待* @return 是否所有工作都处理完毕*/bool waitForAllDone(int millsecond = -1);protected:/*** @brief 获取任务* @param task 任务指针* @return 是否成功获取任务*/bool get(TaskFuncPtr& task);/*** @brief 线程池是否退出* @return 是否退出*/bool isTerminate() { return _bTerminate; }/*** @brief 线程运行逻辑*/void run();protected:queue<TaskFuncPtr> _tasks; // 任务队列std::vector<std::thread*> _threads; // 工作线程std::mutex _mutex; // 互斥锁std::condition_variable _condition; // 条件变量size_t _threadNum; // 线程池线程数std::atomic<int> _bTerminate; // 终止标志
};#endif // ZERO_THREADPOOL_H
zero_threadpool.cpp
解析https://blog.csdn.net/ke_wu/article/details/144860022?spm=1001.2014.3001.5502
#include "zero_threadpool.h"_bTerminate;
_atomic{ 0 };ZERO_ThreadPool::ZERO_ThreadPool(): _threadNum(1), _bTerminate(false)
{
}ZERO_ThreadPool::~ZERO_ThreadPool()
{stop();
}bool ZERO_ThreadPool::init(size_t num)
{std::unique_lock<std::mutex> lock(_mutex);if (!_threads.empty()) // 如果线程池已经有线程了,返回失败{return false;}_threadNum = num; // 设置线程池的线程数return true;
}void ZERO_ThreadPool::stop()
{{std::unique_lock<std::mutex> lock(_mutex);_bTerminate = true; // 设置线程池停止标志_condition.notify_all(); // 通知所有线程停止}for (size_t i = 0; i < _threads.size(); i++){if (_threads[i]->joinable()){_threads[i]->join(); // 等待线程执行完}delete _threads[i]; // 删除线程对象_threads[i] = NULL; // 清空线程指针}std::unique_lock<std::mutex> lock(_mutex);_threads.clear(); // 清空线程池中的线程
}bool ZERO_ThreadPool::start()
{std::unique_lock<std::mutex> lock(_mutex);if (!_threads.empty()) // 如果线程池已经启动,返回失败{return false;}// 启动线程池中的所有线程for (size_t i = 0; i < _threadNum; i++){_threads.push_back(new std::thread(&ZERO_ThreadPool::run, this)); // 创建线程并启动}return true;
}bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{std::unique_lock<std::mutex> lock(_mutex);if (_tasks.empty()){_condition.wait(lock, [this] { return _bTerminate || !_tasks.empty(); }); // 等待任务或停止标志}if (_bTerminate)return false;if (!_tasks.empty()){task = std::move(_tasks.front()); // 获取任务_tasks.pop(); // 移除队列中的任务return true;}return false;
}void ZERO_ThreadPool::run() // 执行任务的线程
{while (!isTerminate()) // 如果线程池没有停止{TaskFuncPtr task;bool ok = get(task); // 从任务队列获取任务if (ok){++_atomic; // 正在执行的任务计数try{// 如果任务超时,进行相应的处理if (task->_expireTime != 0 && task->_expireTime < TNOWMS){// 超时任务,是否需要处理?}else{task->_func(); // 执行任务}}catch (...){// 异常处理}--_atomic; // 完成任务,减少计数// 如果所有任务都执行完,通知waitForAllDonestd::unique_lock<std::mutex> lock(_mutex);if (_atomic == 0 && _tasks.empty()){_condition.notify_all(); // 唤醒所有等待的线程}}}
}bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{std::unique_lock<std::mutex> lock(_mutex);if (_tasks.empty())return true;if (millsecond < 0){_condition.wait(lock, [this] { return _tasks.empty(); }); // 等待所有任务完成return true;}else{return _condition.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return _tasks.empty(); });}
}int gettimeofday(struct timeval &tv)
{
#if WIN32time_t clock;struct tm tm;SYSTEMTIME wtm;GetLocalTime(&wtm);tm.tm_year = wtm.wYear - 1900;tm.tm_mon = wtm.wMonth - 1;tm.tm_mday = wtm.wDay;tm.tm_hour = wtm.wHour;tm.tm_min = wtm.wMinute;tm.tm_sec = wtm.wSecond;tm.tm_isdst = -1;clock = mktime(&tm);tv.tv_sec = clock;tv.tv_usec = wtm.wMilliseconds * 1000;return 0;
#elsereturn ::gettimeofday(&tv, 0); // Linux的获取当前时间
#endif
}void getNow(timeval *tv)
{
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUXint idx = _buf_idx;*tv = _t[idx];if (fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc){addTimeOffset(*tv, idx); // 如果需要,添加时间偏移}else{TC_Common::gettimeofday(*tv); // 获取当前时间}
#elsegettimeofday(*tv); // 获取当前时间
#endif
}int64_t getNowMs()
{struct timeval tv;getNow(&tv);return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000; // 返回当前时间的毫秒数
}
main.cpp
解析https://blog.csdn.net/ke_wu/article/details/144860342?sharetype=blogdetail&sharerId=144860342&sharerefer=PC&sharesource=ke_wu&spm=1011.2480.3001.8118
#include <iostream>
#include <zero_threadpool.h>using namespace std;void func0()
{cout << "func0()" << endl;
}void func1(int a)
{cout << "func1() a=" << a << endl;
}void func2(int a, string b)
{cout << "func2() a=" << a << ", b=" << b << endl;
}void test1() // 简单测试线程池
{ZERO_ThreadPool threadpool;threadpool.init(2);threadpool.start(); // 启动线程池// 执行任务threadpool.exec(1000, func0); // 执行 func0,不设置返回值threadpool.exec(func1, 10); // 执行 func1,传入参数 10threadpool.exec(func2, 20, "darren"); // 执行 func2,传入参数 20 和 "darren"threadpool.waitForAllDone(); // 等待所有任务完成threadpool.stop(); // 停止线程池
}int func1_future(int a)
{cout << "func1() a=" << a << endl;return a;
}string func2_future(int a, string b)
{cout << "func2() a=" << a << ", b=" << b << endl;return b;
}void test2() // 测试任务函数返回值
{ZERO_ThreadPool threadpool;threadpool.init(2);threadpool.start(); // 启动线程池// 执行任务并获取返回值std::future<decltype(func1_future(0))> result1 = threadpool.exec(func1_future, 10);std::future<string> result2 = threadpool.exec(func2_future, 20, "darren");// 输出任务的返回值std::cout << "result1: " << result1.get() << std::endl;std::cout << "result2: " << result2.get() << std::endl;threadpool.waitForAllDone(); // 等待所有任务完成threadpool.stop(); // 停止线程池
}int main()
{// 简单测试线程池test1(); // 测试任务函数返回值test2(); cout << "Hello World!" << endl; // 输出结束信息return 0;
}
参考
https://a-wimpy-boy.blog.csdn.net/article/details/86179781