C++新特性||线程协程(代码解析1)
原文https://blog.csdn.net/ke_wu/article/details/144807820?sharetype=blogdetail&sharerId=144807820&sharerefer=PC&sharesource=ke_wu&spm=1011.2480.3001.8118
#ifndef ZERO_THREADPOOL_H
#define ZERO_THREADPOOL_H#include <future> // 用于future相关操作
#include <functional> // 用于std::bind和function
#include <iostream> // 用于输入输出
#include <queue> // 用于任务队列
#include <mutex> // 用于锁
#include <memory> // 用于智能指针
#ifdef WIN32
#include <windows.h> // Windows平台的头文件
#else
#include <sys/time.h> // Linux平台的时间相关头文件
#endifusing namespace std;// 获取当前时间,填充到timeval结构体中
void getNow(timeval *tv);// 获取当前时间的毫秒数
int64_t getNowMs();// 宏定义用于当前时间
#define TNOW
#define TNOWMS // 获取当前时间
getNow()
// 获取当前时间的毫秒数
getNowMs()////*** @file zero_thread_pool.h* @brief 线程池类, 采用C++11实现** 使用说明:* ZERO_ThreadPool tpool;* tpool.init(5); // 初始化线程池线程数* tpool.start(); // 启动线程池* tpool.exec(testFunction, 10); // 将任务丢到线程池中* tpool.waitForAllDone(1000); // 等待线程池结束,超时1秒* tpool.stop(); // 停止线程池* * 返回值示例:* auto f = tpool.exec(testInt, 5);* cout << f.get() << endl; // 当testInt在线程池中执行后, f.get()会返回数值5*/
class ZERO_ThreadPool
{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime) : _expireTime(expireTime) {}std::function<void()> _func; // 任务的实际函数int64_t _expireTime = 0; // 任务超时时间};typedef shared_ptr<TaskFunc> TaskFuncPtr;public:/*** @brief 构造函数*/ZERO_ThreadPool();/*** @brief 析构函数,停止所有线程*/virtual ~ZERO_ThreadPool();/*** @brief 初始化线程池* @param num 工作线程个数* @return 是否初始化成功*/bool init(size_t num);/*** @brief 获取当前线程池的线程个数* @return 线程个数*/size_t getThreadNum(){std::unique_lock<std::mutex> lock(_mutex);return _threads.size();}/*** @brief 获取当前线程池的任务数* @return 任务数*/size_t getJobNum(){std::unique_lock<std::mutex> lock(_mutex);return _tasks.size();}/*** @brief 停止所有线程,并等待线程结束*/void stop();/*** @brief 启动所有线程* @return 是否成功启动*/bool start();/*** @brief 用线程池执行任务* @param f 任务函数* @param args 任务函数参数* @return 返回任务的future对象*/template <class F, class... Args>auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>{return exec(0, f, args...);}/*** @brief 用线程池执行带有超时的任务* @param timeoutMs 超时时间(毫秒)* @param f 任务函数* @param args 任务函数参数* @return 返回任务的future对象*/template <class F, class... Args>auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>{int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取当前时间// 推导返回值类型using RetType = decltype(f(args...));// 封装任务auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 创建任务指针并设置超时时间TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);fPtr->_func = [task]() { (*task)(); }; // 定义任务执行时的具体行为// 加锁并将任务加入任务队列std::unique_lock<std::mutex> lock(_mutex);_tasks.push(fPtr);_condition.notify_one(); // 唤醒等待线程return task->get_future();}/*** @brief 等待所有任务完成* @param millsecond 等待的时间(毫秒),-1表示无限等待* @return 是否所有工作都处理完毕*/bool waitForAllDone(int millsecond = -1);protected:/*** @brief 获取任务* @param task 任务指针* @return 是否成功获取任务*/bool get(TaskFuncPtr& task);/*** @brief 线程池是否退出* @return 是否退出*/bool isTerminate() { return _bTerminate; }/*** @brief 线程运行逻辑*/void run();protected:queue<TaskFuncPtr> _tasks; // 任务队列std::vector<std::thread*> _threads; // 工作线程std::mutex _mutex; // 互斥锁std::condition_variable _condition; // 条件变量size_t _threadNum; // 线程池线程数std::atomic<int> _bTerminate; // 终止标志
};#endif // ZERO_THREADPOOL_H
这个头文件定义了一个多线程线程池类 ZERO_ThreadPool,用于在多个线程之间调度并执行任务。它采用了 C++11 的标准,主要功能包括任务的执行、线程管理、任务队列等,支持任务超时管理。
文件中的主要部分解释
- 头文件和宏定义
#include <future> // 用于future相关操作
#include <functional> // 用于std::bind和function
#include <iostream> // 用于输入输出
#include <queue> // 用于任务队列
#include <mutex> // 用于锁
#include <memory> // 用于智能指针
#ifdef WIN32
#include <windows.h> // Windows平台的头文件
#else
#include <sys/time.h> // Linux平台的时间相关头文件
#endif
这些是本文件中所需的标准库头文件,用于任务管理、线程同步、内存管理和系统相关功能。根据不同平台,windows.h 或 sys/time.h 被包含进来。
- 时间相关函数
// 获取当前时间,填充到timeval结构体中
void getNow(timeval *tv);// 获取当前时间的毫秒数
int64_t getNowMs();
这些函数用于获取当前系统时间,并将其以不同格式返回。getNow 获取精确到微秒的时间,getNowMs 获取精确到毫秒的时间。
- 宏定义
#define TNOW
#define TNOWMS
这些宏本应提供对当前时间的直接调用,但在当前代码中,它们只是占位符。TNOW 和 TNOWMS 可能应该用于返回当前时间(getNow() 和 getNowMs())。
- TaskFunc 结构体
struct TaskFunc
{TaskFunc(uint64_t expireTime) : _expireTime(expireTime) {}std::function<void()> _func; // 任务的实际函数int64_t _expireTime = 0; // 任务超时时间
};
TaskFunc 用于封装任务及其超时时间。_func 是实际的任务函数,_expireTime 是任务的超时时间(如果有)。
- ZERO_ThreadPool 类
这是线程池类的定义,包含了初始化、任务执行、线程管理等功能。
- 构造函数和析构函数
ZERO_ThreadPool();
virtual ~ZERO_ThreadPool();
构造函数初始化线程池,析构函数清理线程池并停止所有线程。
- 初始化函数
bool init(size_t num);
初始化线程池,num 为线程数,成功返回 true,失败返回 false。
- 获取线程数和任务数
size_t getThreadNum();
size_t getJobNum();
获取当前线程池中的线程数和任务数。都使用了互斥锁 std::mutex 来保证线程安全。
- 停止线程池
void stop();
停止线程池中的所有线程。
- 启动线程池
bool start();
启动线程池中的所有线程。
- 任务执行
template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
提交任务到线程池,f 是任务函数,args 是任务的参数。返回一个 std::future 对象,用于获取任务的结果。
还有一个重载版本,允许指定超时时间:
template <class F, class... Args>
auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
timeoutMs 指定任务的超时毫秒数。
- 等待所有任务完成
bool waitForAllDone(int millsecond = -1);
等待所有任务完成。millsecond 指定超时时间,默认为 -1,表示无限等待。
- 获取任务
bool get(TaskFuncPtr& task);
从任务队列中获取任务。
- 线程运行逻辑
void run();
线程池的工作线程会调用此函数,它的主要职责是从任务队列中取任务并执行。
- 保护成员变量
- _tasks: 任务队列,存储待执行的任务。
- _threads: 工作线程的列表。
- _mutex: 保护任务队列的互斥锁。
- _condition: 条件变量,用于通知工作线程有新的任务到来。
- _threadNum: 线程池中的线程数。
- _bTerminate: 线程池是否终止的标志。
总结
这个线程池类通过以下方式实现多线程任务管理:
- 支持动态提交任务,任务可以有超时设置。
- 通过互斥锁和条件变量保证线程安全和任务调度。
- 提供了等待任务完成的功能,防止主线程提前退出。
- 支持跨平台,Windows 和 Linux 系统可以使用。
这种设计非常适合需要高并发和任务调度的场景,能有效地利用多个线程来执行任务,提高系统效率。