当前位置: 首页 > news >正文

【Linux跬步积累】—— 线程池详解(有源代码)

文章目录

  • 一、如何实现一个线程
    • 1、基本结构
    • 2、实现成员函数
    • 3、演示
    • 4、代码总汇
      • `Thread.hpp`
      • `Main.cc`
  • 二、如何封装线程池
    • 1、设计成员变量
    • 2、构造函数与析构函数
    • 3、初始化
    • 4、启动与回收
    • 5、主线程放入任务
    • 6、子线程读取任务
    • 7、终止线程池
  • 三、测试
  • 四、线程池总代码
    • 1、`ThreadPool.hpp`
    • 2、`Main.cc`

一、如何实现一个线程

1、基本结构

Thread类是用来描述一个线程的,所以成员变量中需要有tid线程名。所以第一个成员变量就是==_tid==;

一个线程创建出来,肯定是用来执行对于的任务的,那么我们还需要一个成员变量来接收传递的函数,所以第三个成员变量就是==_func==,是一个void(T&)类型的参数,所以第四个成员变量就是传递过来的参数_data

template<class T>
using func_t = std::function<void(T&)>;//模版方法template<class T>
class Thread
{
public:Thread(){}static void* ThreadRoutinue(){}void Start(){}void Join(){}void Detach(){}~Thread(){}
private:pthread_t _tid;          //线程tidstd::string _threadname; //线程名func_t<T> _func;         //线程执行的函数T _data;                 //需要处理的数据
};

2、实现成员函数

成员函数中我们需要注意的就是pthread_create()函数,它的第三个参数是一个参数为void *,返回值为void *的一个函数。

但是我们将这个函数ThreadRoutinue()定义在这个类里,他就是一个成员函数,成员函数的参数,会隐藏this指针,所以如果我们正常写,就会报错,因为这个函数不满足pthread_create()函数的条件。

但是只要让ThreadRoutinue()的参数中没有this指针就可以了,那么该如何实现呢?

在前面加上static就可以,变成静态成员变量,就不会有this指针了。那么问题又来了,没有了this指针,我们又该如何访问到成员变量呢?

别忘了这个函数可以传递一个返回值为void*的参数,我们只需要将this指针传递过去,在函数内部强转一下就可以了。

template<class T>
using func_t = std::function<void(T&)>;//模版方法template<class T>
class Thread
{
public://thread(func,5,"thread-1");Thread(func_t<T> func,const T &data,const std::string &name = "none-name"):_func(func),_data(data),_threadname(name){}//需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了static void* ThreadRoutinue(void* args){//将传过来的this指针强转一下,然后就可以访问到_func和_data了Thread<T>* self = static_cast<Thread<T>*>(args);self->_func(self->_data);return nullptr;}void Start(){//创建线程int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);return ret==0;}void Join(){pthread_join(_tid,nullptr);}void Detach(){pthread_detach(_tid);}~Thread(){}
private:pthread_t _tid;          //线程tidstd::string _threadname; //线程名func_t<T> _func;         //线程执行的函数T _data;                 //需要处理的数据
};

3、演示

让我们写一段测试代码,来看一下效果:

#include"Thread.hpp"void test(int x)
{while(true){std::cout<<x<<std::endl;sleep(1);}
}int main()
{MyThread::Thread<int> mt(test,2025,"thread-1");if(mt.Start() == true){std::cout<<"MyThread start success!\n";}else{std::cout<<"MyThread start failed!\n";}mt.Join();return 0;
}

运行结果:

在这里插入图片描述

让我们使用ps -aL指令来查看一下是否真的创建了线程:

在这里插入图片描述

可以看到,程序运行之后,真的创建出了两个名为mythread的线程。

4、代码总汇

Thread.hpp

#pragma once
#include<string>
#include<pthread.h>
#include<unistd.h>
#include<iostream>
#include<functional>namespace MyThread
{template<class T>using func_t = std::function<void(T&)>;//模版方法template<class T>class Thread{public://thread(func,5,"thread-1");Thread(func_t<T> func,const T &data,const std::string &name = "none-name"):_func(func),_data(data),_threadname(name){}//需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了static void* ThreadRoutinue(void* args){//将传过来的this指针强转一下,然后就可以访问到_func和_data了Thread<T>* self = static_cast<Thread<T>*>(args);self->_func(self->_data);return nullptr;}bool Start(){//创建线程int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);return ret==0;}void Join(){pthread_join(_tid,nullptr);}void Detach(){pthread_detach(_tid);}~Thread(){}private:pthread_t _tid;          //线程tidstd::string _threadname; //线程名func_t<T> _func;         //线程执行的函数T _data;                 //需要处理的数据};
}

Main.cc

#include"Thread.hpp"void test(int x)
{while(true){std::cout<<x<<std::endl;sleep(1);}
}int main()
{MyThread::Thread<int> mt(test,2025,"thread-1");if(mt.Start() == true){std::cout<<"MyThread start success!\n";}else{std::cout<<"MyThread start failed!\n";}mt.Join();return 0;
}

二、如何封装线程池

1、设计成员变量

线程池内部维护多个线程和一个任务队列,主线程将任务放入任务队列当中,然后子线程就从任务队列中拿取任务进行处理。

所以需要一个数组来管理多个线程:_threads

以及一个任务队列:_taskQueue

此外我们还需要知道一共有多少个线程:_threadNum

然后还可以设置一个变量来查看真在等待任务的线程数目:_waitNum

最后我们再设置一个变量来判断当前线程池是否运行,如果已经退出了,我们需要将任务队列中的任务处理完再退出:_isRunning

定义这些变量就够了吗?

我们忽略了一个多线程编程中最重要的问题:线程之间的互斥和同步

我们的任务是:主线程往任务队列中放入任务,子线程从任务队列中拿取任务。那么我们思考一下下面几个问题:

  1. 多个线程之间可以同时从任务队列中拿任务吗?

    答:不能,任务队列是临界资源,线程和线程之间要互斥,否则会出现不同的线程拿取同一个任务的情况。

  2. 主线程放入任务时,子线程可以同时拿取任务吗?

    答:不能,主线程和子线程之间也需要互斥。

因为他们都是竞争任务队列这一个资源,所以我们只要定一个一把锁就可以了。这样互斥的问题就解决了。

那么同步呢?

是不是只有任务队列中有任务时,子线程才能获取任务,所以需要主线程先放任务,子线程才能拿任务,这就需要一个条件变量来维护。

综上:

我们还需要两个成员变量:_mutex_cond

#include"Thread.hpp"
#include<vector>template<class T>
class ThreadPool
{   
private:std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程std::queue<T> _taskQueue;//任务队列int _threadNum;//线程数int _waitNum;//等待的线程数bool _isRunning;//线程池是否在运行pthread_mutex_t _mutex;//互斥锁pthread_cond_t _cond;//条件变量
};

2、构造函数与析构函数

构造和析构的主要作用就是对_mutex_cond的初始化和销毁。

同时我们还需要知道这个线程池需要创建多少个线程,所以需要外部传递参数来告诉我们。

然后就是构造函数对其他成员变量进行初始化。

template<class T>
class ThreadPool
{   
public:ThreadPool(const int num = 5):_threadNum(num),_waitNum(0),_isRunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
}

3、初始化

上面的构造函数只是创建了锁和条件变量,以及部分变量的初始化,并没有创建出线程对象。

我们可以定义一个ThreadInit()函数来创建线程。

让我们先回顾一下Thread的构造函数需要哪些变量:

Thread(func_t<T> func,const T &data,const std::string &name = "none-name")

func参数是需要调用的函数,data是这个函数需要处理的数据,name是线程名。

在此,我们让线程去执行一个叫做handerTask的函数,这个函数内部实现线程到任务队列中获取任务的过程。

handerTask的第一个参数也是线程的名字,以便在handerTask内部查看是哪个线程执行了任务。

这里我们使用bind函数来将HanderTask函数与this参数绑定在一起,并且将这个参数绑定到 HandleTask 的第一个参数位置。

void HanderTask(std::string)
{//执行任务队列的任务
}void InitThread()
{for(int i=0;i<_threadNum;i++){auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);std::string name = "Thread-"+std::to_string(i);//_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员_threads.emplace_back(func,name,name);}_isRunning = true;
}

4、启动与回收

我们已经创建出来了一批线程,接下来还需要启动这一批线程,并且回收。

因此还需要定义成员函数StartAllJoinAll来启动和等待这批线程。

void StartAll()
{for(auto& thread : _threads){thread.Start();}
}
void JoinAll()
{for(auto& thread : _threads){thread.Join();}
}

5、主线程放入任务

我们可以定义一个EnQueue,用来让主线程往任务队列中投放任务。

投放任务的要求:

  1. 访问队列时需要与其他线程互斥,即对_mutex加锁;
  2. 添加任务后,就可以唤醒在等待的线程了
void EnQueue(const T& task)
{pthread_mutex_lock(&_mutex);if(_isRunning){_taskQueue.push(task);if(_waitNum > 0){pthread_cond_signal(&_cond);}}pthread_mutex_unlock(&_mutex);
}

6、子线程读取任务

子线程读取任务的要求如下:

  1. 保持互斥,从任务队列获取数据前需要加锁,获取结束后解锁;
  2. 保持同步,如果任务队列中没有数据,就去_cond下等待,等待被唤醒。
void HanderTask(std::string name)
{//子线程需要一直处理,所以这里使用死循环while(true){pthread_mutex_lock(&_mutex);while(_taskQueue.empty())//这里是while循环,不是if判断,避免伪唤醒{_waitNum++;pthread_cond_wait(&_cond,&_mutex);_waitNum--;}T task = _taskQueue.front();_taskQueue.pop();std::cout<<name<<"get a task..."<<std::endl;pthread_mutex_unlock(&_mutex);task();}
}

这里需要注意一点,判断当前任务队列是否为空时,使用的是while循环,而不是if语句,因为当前线程被主线程唤醒之后,可能会发生伪唤醒,其实任务队列中根本没有任务。所以还要进入下一次while判断,确保访问任务队列时,一定是有任务的。

但是目前还有一个问题,如果线程访问任务队列时,线程池被终止了怎么办?

我们可以通过_isRunning来判定,在执行任务时判断一下_isRunning的值:

  1. 如果为true:正常运行
  2. 如果为false:
    • 如果任务队列中还有任务:把任务执行完
    • 如果没有任务:当前线程退出

于是我们的代码改进为:

void HanderTask(std::string name)
{//子线程需要一直处理,所以这里使用死循环while(true){pthread_mutex_lock(&_mutex);while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒{_waitNum++;pthread_cond_wait(&_cond,&_mutex);_waitNum--;}//线程池终止了,并且任务队列中没有任务 --> 线程退出if(_taskQueue.empty()&&!_isRunning){pthread_mutex_unlock(&_mutex);std::coud<<name<<" quit..."<<std::endl;break;}//走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出T task = _taskQueue.front();_taskQueue.pop();std::cout<<name<<" get a task..."<<std::endl;pthread_mutex_unlock(&_mutex);task();}
}

7、终止线程池

终止线程池不仅仅是将_isRunning设置为false这么简单,需要考虑以下问题:

  1. 如果在Stop的时候,有线程正在调用HanderTask函数怎么办?

    答:此时多个线程访问变量_isRunning,就有可能会造成线程安全问题,所以访问_isRunning时也要加锁,由于之前所有的访问_isRunning的操作,都在_mutex锁中,所以和之前共用同一把锁就行。

  2. 如果Stop之后,还有线程在_cond下面等待怎么办?

    答:如果线程一直在_cond下面等待,就会导致无法退出,此时在_isRunning = false之后,还要通过pthread_cond_broadcast唤醒所有等待的线程,让他们重新执行HanderTask的逻辑,从而正常退出。

void Stop()
{pthread_mutex_lock(&_mutex);_isRunning = false;//终止线程池pthread_cond_broadcast(&_cond);//唤醒所有等待的线程pthread_mutex_unlock(&_mutex);
}

三、测试

我们可以用以下代码进行测试:

#include <iostream>
#include <vector>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include"ThreadPool.hpp"int Add()
{int a = rand() % 100 + 1;int b = rand() % 100 + 1;std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;return a+b;
}int main()
{srand(static_cast<unsigned int>(time(nullptr)));ThreadPool<int(*)(void)> tp(3);tp.ThreadInit();tp.StartAll();for (int i = 0; i < 10; i++){tp.EnQueue(Add);sleep(1);}tp.Stop();tp.JoinAll();return 0;
}

通过ThreadPool<int(*)(void)> tp(3);创建有三个线程的线程池,执行的任务类型为int(void),但是要注意,此处要传入可调用对象,C++的可调用对象有:函数指针,仿函数,lambda表达式。此处我用了函数指针int(*)(void)

接着ThreadInit初始化线程池,此时线程对象Thread已经创建出来了,但是还有没创建线程。随后调用StartAll,此时才真正创建了线程。

然后进入一个for循环,给任务队列派发任务,总共派发十个任务,都是函数Add,其中生成两个随机数的加法。

最后调用Stop终止退出线程池,此时线程也会一个个退出,然后调用JoinAll回收所有线程。

运行结果:

在这里插入图片描述

四、线程池总代码

1、ThreadPool.hpp

#include"Thread.hpp"
#include<vector>
#include<queue>
#include<string>
#include <unistd.h>
#include <pthread.h>template<class T>
class ThreadPool
{   
public:ThreadPool(const int num = 5):_threadNum(num),_waitNum(0),_isRunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);}void HanderTask(std::string name){//子线程需要一直处理,所以这里使用死循环while(true){pthread_mutex_lock(&_mutex);while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒{_waitNum++;pthread_cond_wait(&_cond,&_mutex);_waitNum--;}//线程池终止了,并且任务队列中没有任务 --> 线程退出if(_taskQueue.empty()&&!_isRunning){pthread_mutex_unlock(&_mutex);std::cout<<name<<" quit..."<<std::endl;break;}//走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出T task = _taskQueue.front();_taskQueue.pop();std::cout<<name<<" get a task..."<<std::endl;pthread_mutex_unlock(&_mutex);task();}}void ThreadInit(){for(int i=0;i<_threadNum;i++){auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);std::string name = "Thread-"+std::to_string(i);//_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员_threads.emplace_back(func,name,name);}_isRunning = true;}void StartAll(){for(auto& thread : _threads){thread.Start();}}void JoinAll(){for(auto& thread : _threads){thread.Join();}}void EnQueue(const T& task){pthread_mutex_lock(&_mutex);if(_isRunning){_taskQueue.push(task);if(_waitNum > 0){pthread_cond_signal(&_cond);}}pthread_mutex_unlock(&_mutex);}void Stop(){pthread_mutex_lock(&_mutex);_isRunning = false;//终止线程池pthread_cond_broadcast(&_cond);//唤醒所有等待的线程pthread_mutex_unlock(&_mutex);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程std::queue<T> _taskQueue;//任务队列int _threadNum;//线程数int _waitNum;//等待的线程数bool _isRunning;//线程池是否在运行pthread_mutex_t _mutex;//互斥锁pthread_cond_t _cond;//条件变量
};

2、Main.cc

#include <iostream>
#include <vector>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include"ThreadPool.hpp"int Add()
{int a = rand() % 100 + 1;int b = rand() % 100 + 1;std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;return a+b;
}int main()
{srand(static_cast<unsigned int>(time(nullptr)));ThreadPool<int(*)(void)> tp(3);tp.ThreadInit();tp.StartAll();for (int i = 0; i < 10; i++){tp.EnQueue(Add);sleep(1);}tp.Stop();tp.JoinAll();return 0;
}

http://www.mrgr.cn/news/93233.html

相关文章:

  • 选择排序算法
  • 离线地图显示
  • Redis数据结构——list
  • Redis数据结构——set
  • Java多线程与高并发专题——ConcurrentHashMap 在 Java7 和 8 有何不同?
  • C++————引用
  • 递归入手三维动态规划
  • Idea配置注释模板
  • 用CMake编译glfw进行OpenGL配置,在Visual Studio上运行
  • 图解MOE大模型的7个核心问题并探讨DeepSeekMoE的专家机制创新
  • 5年前问题的答案,如何造统计信息
  • Mybatis中的设计模式
  • 安装微软最新原版系统,配置好系统驱动并保留OOBE全新体验
  • JAVA入门——反射
  • 《Operating System Concepts》阅读笔记:p188-p199
  • 蓝桥杯C组真题——巧克力
  • Linux软件包管理
  • HTTP 黑科技
  • uniapp:小程序将base64图片字符串保存到手机相册
  • 免费分享一个软件SKUA-GOCAD-2022版本