当前位置: 首页 > news >正文

C++线程池

C++线程池

1. 环境

C++17 Linux、 windows均可。

代码

#define THREAD_POOL_H_
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <memory>
#include <atomic>
#include <mutex>
#include <chrono>
#include <functional>
#include <future>
#include <condition_variable>
#include <unordered_map>
const int TASK_MAX_THRESHOLD = INT32_MAX;
const int THREAD_MAX_THRESHOLD = 100;
const int MAX_FREE_TIME = 4;
enum class Thread_Mode {Thread_Fixed,  // 固定线程数量Thread_Cached, // 可调整的线程数量
};
// 任务抽象基类
// 用户可以自定义任意任务类型,从Task继承,重写run方
class Thread {
public:using thread_func = std::function<void(int)>;void start() {// 执行一个线程函数// 创建一个线程std::thread t(func_, thread_id_);t.detach(); // 设置分离线程 线程启动后,出作用域后,线程仍然存在}// 线程函数对象构造Thread(thread_func func): func_(func), thread_id_(generate_id_++) {}// 线程析构~Thread() = default;int get_id() const {return thread_id_;}private:thread_func func_;static int generate_id_;int thread_id_; // 保存线程id
};
int Thread::generate_id_ = 0;
class Thread_Pool
{
public:// Thread_Pool();Thread_Pool(): init_thread_size_(4), task_size_(0), task_maxsize_threshold_(TASK_MAX_THRESHOLD), mode_(Thread_Mode::Thread_Fixed), pool_running_(false), idle_thread_size_(0), threads_size_hold_(THREAD_MAX_THRESHOLD), current_thread_size_(0) {}~Thread_Pool() {pool_running_ = false;// 等待线程池所有线程结束// 状态:阻塞 & 正在执行任务中// 线程通信// 唤醒线程,释放锁std::unique_lock<std::mutex> lock(task_queue_mux);not_empty_.notify_all();exit_cond_.wait(lock, [&]() -> bool{ return threads_.size() == 0; });}void set_mode(Thread_Mode mode) {if (check_running()){return;}mode_ = mode;}// void set_maxtask_queue_threshold(int threashold);//设置最大线程池数量void set_maxtask_queue_threshold(int threashold) {if (check_running()) {return;}task_maxsize_threshold_ = threashold;} // 设置最大线程池数量void set_thread_hold_size(int size) {if (check_running()) {return;}if (mode_ == Thread_Mode::Thread_Cached) {threads_size_hold_ = size;}}// void start(int size = std::thread::hardware_concurrency());//开启线程池 std::thread::hardware_concurrency 当前cpu的核心数量//  开启线程池void start(int size) {// 记录初始线程数量个数pool_running_ = true;init_thread_size_ = size;// set_init_threads_size(size);current_thread_size_ = size;// 创建线程对象for (int i = 0; i < size; ++i) {// 创建thread线程对象的时候,把线程函数给到Thread 对象std::unique_ptr<Thread> ptr = std::make_unique<Thread>(std::bind(&Thread_Pool::thread_func, this, std::placeholders::_1));// threads_.emplace_back(std::move(ptr)); //std::move是因为unique_ptr 是不支持拷贝跟赋值的int threads_id = ptr->get_id();threads_.emplace(threads_id, std::move(ptr));// current_thread_size_ ++;}// 启动所有线程for (int i = 0; i < size; ++i) {threads_[i]->start();++idle_thread_size_;}}//线程池提交任务,使用可变参数模板编程,让submit可以接受任意参数的函数// Result submit_task(std::shared_ptr<Task> sp) {// }//Func&& func 引用折叠,future函数返回值,decltype类型推导//std::forward 保持其参数类型,左值就是左值template<typename Func,typename... Args>auto submit_task(Func&& func,Args&& ... args) -> std::future<decltype (func(args...))>{//打包任务,放到任务队列里面using return_type = decltype(func(args...));//std::packaged_task 将任何可调用对象(比如函数、lambda 表达式等等)封装成一个 task,可以异步执行。执行结果可以使用 std::future 获取auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<Func>(func),std::forward<Args>(args)...));//获取与该 std::packaged_task 关联的 std::future 对象,用于在未来的某个时刻获取任务的执行结果。std::future<return_type> result = task->get_future();// 1. 获取锁std::unique_lock<std::mutex> lock(task_queue_mux);// 2. 线程的通信,等待任务队列有空余if (!not_full_.wait_for(lock, std::chrono::seconds(1), [&]() -> bool{ return task_queue_.size() < task_maxsize_threshold_; })) {std::cerr << "submit task failed ,the thread_pool is full" << std::endl;auto task = std::make_shared<std::packaged_task<return_type()>>([]() ->return_type{return return_type();} );(*task)();return task->get_future();}// 3.如果有空余的话,把任务队列放置到任务队中// task_queue_.emplace(sp);//using Task = std::function<void()>;//去执行这个任务task_queue_.emplace([task]() {(*task)();});task_size_++;// 3. 释放锁,通知消费者线程去处理 ,分配线程执行这个任务not_empty_.notify_all();if (mode_ == Thread_Mode::Thread_Cached && task_size_ > idle_thread_size_ && current_thread_size_ < threads_size_hold_) {// 创建新线程 std::placeholders::_1 参数占位符std::unique_ptr<Thread> ptr = std::make_unique<Thread>(std::bind(&Thread_Pool::thread_func, this, std::placeholders::_1));// threads_.emplace_back(std::move(ptr)); //std::move是因为uniqstd::cout << "create new thread" << std::endl;int threads_id = ptr->get_id();threads_.emplace(threads_id, std::move(ptr));threads_[threads_id]->start();current_thread_size_++;idle_thread_size_++;}std::cout << "create success" << std::endl;//std::cout << "threads_ size =" << threads_.size() << std::endl;return result;}Thread_Pool(const Thread_Pool &) = delete;Thread_Pool &operator=(const Thread_Pool &) = delete;// void set_thread_hold_size(int size);//设置cache模式下的线程阈值
private:void thread_func(int thread_id)  {auto last = std::chrono::high_resolution_clock().now();for (;;) {Task task;{// 1.先获取锁std::unique_lock<std::mutex> lock(task_queue_mux);std::cout << "tid:" << std::this_thread::get_id() << "尝试获取任务" << std::endl;// cache 模式下,创建出来的线程,若其等待时间超过60s的话,则需要将其回收// 超过初始化线程数量的线程要回收// 每一秒钟返回一次:怎么区分超时返回还是有任务返回// std::cout << __LINE__ << " \t size =" << (task_size_ == 0) << std::endl;// 双重判断锁while (task_size_ == 0) {// 线程要结束,回收线程资源std::cout << "pool_running_ =" << pool_running_ << "\t thread_id =" << thread_id << "\t threads_ find it ? " << (threads_.find(thread_id) == threads_.end()) << std::endl;std::cout << "threads size =" << threads_.size() << std::endl;if (!pool_running_) {threads_.erase(thread_id);// current_thread_size_--;// idle_thread_size_--;exit_cond_.notify_all();std::cout << "thread_id =" << std::this_thread::get_id() << "exit" << std::endl;return;}// 条件变量,超时返回if (mode_ == Thread_Mode::Thread_Cached) {if (std::cv_status::timeout ==not_empty_.wait_for(lock, std::chrono::seconds(1))) {auto now = std::chrono::high_resolution_clock().now();auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - last);std::cout << "current_thread_size_ =" << current_thread_size_ << "\t MAX_FREE_TIME= " << MAX_FREE_TIME << "\t \init_thread_size_ ="<< init_thread_size_ << "\t dur.count() =" << dur.count() << std::endl;if (dur.count() >= MAX_FREE_TIME &&current_thread_size_ > init_thread_size_) {// 开始回收当前线程//  记录线程数量的相关变量值的修改//  把线程对象从线程列表中删除 无办法去匹配哪个thread对象threads_.erase(thread_id);current_thread_size_--;idle_thread_size_--;std::cout << "thread_id =" << std::this_thread::get_id() << "exit" << std::endl;return;}}} else {// 2. 等待not_empty条件not_empty_.wait(lock);}// 线程池要结束,回收线程资源}--idle_thread_size_;std::cout << "tid:" << std::this_thread::get_id() << "获取任务成功" << std::endl;// 3. 取任务task = task_queue_.front();task_queue_.pop();task_size_--;// 如果依然有剩余任务,则需要继续通知其他线程执行任务if (task_queue_.size() > 0)  {not_empty_.notify_all();}// 取除任务完之后,得进行通知not_full_.notify_all();}// 4. 执行任务if (task != nullptr){// task->run();//task->exec(); // 把任务返回值给set_valuetask(); //执行绑定的函数对象}++idle_thread_size_;last = std::chrono::high_resolution_clock().now();}// threads_.erase(thread_id);// exit_cond_.notify_all();// std::cout << "thread_id =" << std::this_thread::get_id() << "exit}bool check_running() const {return pool_running_;}private:/// std::vector<std::unique_ptr<Thread>> threads_; //线程列表std::unordered_map<int, std::unique_ptr<Thread>> threads_;size_t init_thread_size_; // 初始线程数量int threads_size_hold_;   // 线程数量上线阈值// std::queue<Task*> task_queue_;//存储的是任务队列 ,要考虑任务是否释放//所谓任务实际上是函数对象using Task = std::function<void()>;std::queue<Task> task_queue_; // 存储的是任务队列 ,要考虑任务是否释放std::atomic_int task_size_;                    // 任务个数,原子是为了保证线程安全int task_maxsize_threshold_;                   // 任务队列的上线阈值std::mutex task_queue_mux;                     // 保证任务队列的线程安全std::condition_variable not_full_;;                                     // 任务队列不满std::condition_variable not_empty_;   // 任务队列不空Thread_Mode mode_;                    // 当前线程池的modestd::condition_variable exit_cond_;   // 等待线程资源全部回收std::atomic_bool pool_running_;       // 表示当前线程池的启动状态std::atomic_int idle_thread_size_;    // 空闲数量线程std::atomic_int current_thread_size_; // 记录当前线程池中总的线程数量
};
#endif

使用

// thread_pool_last.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//// #include <iostream>
// #include<functional>
// #include<future>
// #include<thread>
// #include "threadpool.h"#include "thread_pool.h"
// int sum1(int a, int b) {
//     return a + b;
// }
// int sum2(int a, int b,int c) {
//     return a + b + c;
// }
// int main()
// {
//     // //std::packaged_task 支持get_future直接拿到返回值
//     // std::packaged_task<int(int, int)> task(sum1);
//     // std::future<int> res = task.get_future();
//     // std::thread t(std::move(task), 10, 20);
//     // t.detach();
//     // std::cout << res.get() << std::endl;//若任务相对耗时,则会阻塞
//     // Thread_Pool pool;
//     // pool.set_mode(Thread_Mode::Thread_Cached);
//     // pool.start(2);
//     // std::future<int>r1 = pool.submit_task(sum1,10,10);
//     // std::future<int>r2 = pool.submit_task(sum1,10,10);
//     // std::future<int>r3 = pool.submit_task(sum1,10,10);
//     // std::future<int>r4 = pool.submit_task(sum1,10,10);
//     // std::cout << "r1 =" << r1.get() << std::endl;
//     // std::cout << "r2 =" << r2.get() << std::endl;
//     // std::cout << "r3 =" << r1.get() << std::endl;
//     // std::cout << "r4 =" << r2.get() << std::endl;
//     // ThreadPool pool;
//     // pool.setMode(PoolMode::MODE_CACHED);
//     // pool.start(2);
//     // std::future<int>r1 = pool.submitTask(sum1,10,10);
//     // std::future<int>r2 = pool.submitTask(sum1,10,10);
//     // std::future<int>r3 = pool.submitTask(sum1,10,10);
//     // std::future<int>r4 = pool.submitTask(sum1,10,10);
//     // std::cout << "r1 =" << r1.get() << std::endl;
//     // std::cout << "r2 =" << r2.get() << std::endl;
//     // std::cout << "r3 =" << r1.get() << std::endl;
//     // std::cout << "r4 =" << r2.get() << std::endl;
//     //pool.start(4);
// }
// 线程池项目-最终版.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//#include <iostream>
#include <functional>
#include <thread>
#include <future>
#include <chrono>
using namespace std;// #include "threadpool.h"/*
如何能让线程池提交任务更加方便
1. pool.submitTask(sum1, 10, 20);pool.submitTask(sum2, 1 ,2, 3);submitTask:可变参模板编程2. C++11 线程库   thread   packaged_task(function函数对象)  async 使用future来代替Result节省线程池代码
*/int sum1(int a, int b)
{//this_thread::sleep_for(chrono::seconds(2));// 比较耗时return a + b;
}
int sum2(int a, int b, int c)
{//this_thread::sleep_for(chrono::seconds(2));return a + b + c;
}
// io线程 
void io_thread(int listenfd)
{}
// worker线程
void worker_thread(int clientfd)
{}
int main()
{Thread_Pool pool;pool.set_mode(Thread_Mode::Thread_Cached);//pool.start(2);future<int> r1 = pool.submit_task(sum1, 1, 2);future<int> r2 = pool.submit_task(sum2, 1, 2, 3);future<int> r3 = pool.submit_task([](int b, int e)->int {int sum = 0;for (int i = b; i <= e; i++)sum += i;return sum;}, 1, 100);future<int> r4 = pool.submit_task([](int b, int e)->int {int sum = 0;for (int i = b; i <= e; i++)sum += i;return sum;}, 1, 100);future<int> r5 = pool.submit_task([](int b, int e)->int {int sum = 0;for (int i = b; i <= e; i++)sum += i;return sum;}, 1, 100);//future<int> r4 = pool.submitTask(sum1, 1, 2);//在get之前这个任务是有效的,在get之后这个任务失效,get只有第一次获取的时候是有效的r1.valid()cout << r1.get() << endl;cout << r2.get() << endl;cout << r3.get() << endl;cout << r4.get() << endl;cout << r5.get() << endl;//packaged_task<int(int, int)> task(sum1);future <=> Result//future<int> res = task.get_future();task(10, 20);//thread t(std::move(task), 10, 20);//t.detach();//cout << res.get() << endl;/*thread t1(sum1, 10, 20);thread t2(sum2, 1, 2, 3);t1.join();t2.join();*/
}

http://www.mrgr.cn/news/65610.html

相关文章:

  • Rust的enum枚举的强大用法
  • 一键安装python3
  • 解决Redis缓存穿透(缓存空对象、布隆过滤器)
  • UOS 安装usb wifi 网卡驱动
  • 【数据结构】树-二叉树(链式)
  • 如何在 Vue.js 中使用 Mock 数据:教程与技巧【前端mock】
  • sklearn红酒数据集分类器的构建和评估
  • 图说复变函数论重大错误:将无穷多各异平面误为同一面
  • 智慧医疗——提出了一种基于敌对领域适应症预测候选抗癌药物的方法
  • 江协科技STM32学习- P35 硬件I2C读写MPU6050
  • 信息安全工程师(74)网络安全风险评估技术方法与工具
  • 633. 平方数之和 中等
  • 总结拓展十五:SAP物料分割评估
  • MATLAB绘图基础10:MATLAB极坐标相关图形
  • NRF52832学习笔记(41)——添加串口库libuarte
  • 【ACM出版,EI稳定检索】2024年人工智能、数字媒体技术与交互设计国际学术会议(ICADI 2024,11月29-12月1日)
  • clickhouse配置用户角色与权限
  • VScode插件:前端每日一题
  • 西门子1200PLC输入/输出的源漏型解释
  • gozero--环境安装和api语法
  • Dify 中的 Bearer Token 与 API-Key 鉴权方式
  • flutter 专题七 Flutter面试之渲染流程
  • 易灵思fpga pwm生成报错
  • 004-Kotlin界面开发快速入水之TicTacToe
  • 数论——约数
  • 时间序列预测(十七)——滑动窗口的使用