【Linux系统编程】生产消费者模型
目录
生产消费者模型简介
基于阻塞队列的生产消费模型
多线程类的实现
阻塞队列
生产消费模型的调用代码
基于环形队列的生产消费模型
环形队列
编辑
单生产单消费模型的环形队列
多生产多消费模型的环形队列
生产消费模型的并发意义
生产消费者模型简介
日常生活中的生产消费者模型
在日常生活中也广泛存在着生产消费者模型,以一个顾客在超市购物为例:
- 在购买的过程中主要存在着三种角色:
- 厂商:提供商品的角色,通常对应的生产消费模型中的生产者
- 顾客:购买商品的角色,通常对应的生产消费模型中的消费者
- 超市:临时保存商品的地方,供生产者和消费者使用的交易场所
一家超市,可以由多个厂商供货,也可以有多个顾客来购物,这也就是所谓的多生产者和多消费者
计算机中的生产消费者模型
日常生活中有生产消费者模型,那么计算机世界当中的生产消费者模型是什么样的呢?
- 商品:计算机当中的"商品"实际上是数据
- 生产者:负责生产数据的线程
- 消费者:负责获取数据并处理工作的线程
- 交易场所:实际上就是临时保存数据的"内存空间",交易场所通常需要好管理,所以一般交易场所都是特定数据结构对象
一个数据结构对象,可以有多个线程生产数据,也可以有多个线程获取对象中的数据,这就是计算机当中的多生产者和多消费者
生产消费者模型的优势
并发处理,提高效率
- 生产消费者模型允许多个线程并发地处理数据。生产者线程可以在消费者线程处理数据的同时继续生产数据,从而提高系统的整体效率。
生产和消费数据解耦
- 一个线程原本即可以处理数据,又可以获得数据的,当他们转化为生产者和消费者时,对于这个线程来说只需要考虑它作为生产者或消费者的一面,而不需要考虑即作为生产者,又作为消费者的情况,提高了代码的可维护性
支持忙闲不均
- 当生产者和消费者某一方过快时,会停下来等待对方,不必担心交易场所中数据过多或没有数据的情况
生产消费者模型的并发问题
生产者和生产者
- 若一个生产者正在写入数据到交易场所,不允许另一个生产者同时写入,不然会导致数据紊乱,所以生产者和生产者之间必须是互斥的
消费者和消费者
- 若一个消费者正在读取数据,不允许另一个消费者同时读取数据,这主要是保证数据一致性,所以消费者和消费者之间必须是互斥的
生产者和消费者
- 生产者正在生产数据时不允许消费者读取,这可能会导致数据不一致的问题,所以生产者和消费者之间必须是互斥的
- 一个数据的交易过程必须是生产者先生产数据之后,消费者才能消费数据。所以如果交易场所的没有数据,消费者需要等待生产者生产数据。若交易场所的数据超过上限,生产者必须等待消费者消费,所以生产者和消费者之间的关系必须是同步的
总结:
3种关系:
- 生产者 vs 生产者 互斥
- 消费者 vs 消费者 互斥
- 生产者 vs 消费者 互斥 + 同步
2种角色:
- 生产者
- 消费者
1个交易场所
基于阻塞队列的生产消费模型
结构分析
示例图:
- 基于上图,如果我们要实现生产者和消费者本质上就是对线程的封装,所以需要一个模拟实现的线程类
- 交易场所我们采用的是阻塞队列
多线程类的实现
框架
#ifndef __THREAD_HPP
#define __THREAD_HPP#include <iostream>
#include <string>
#include <functional>template<class T>
using func_t = std::function<void(T&)>;template<class T>
class Thread
{public://实现protected:pthread_t _tid;std::string _threadname;T &_data;func_t<T> _func;
};#endif
- _threadname 线程名
- _tid 线程标识符
- _func 线程执行的方法
- _data 线程参数,用于传递给_func
封装线程库代码
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include <functional>
#include <assert.h>
template <class T>
using func_t = std::function<void(T &)>;template <class T>
class Thread
{
public:void Execute(){_func(_data);}public:// 构造函数Thread(func_t<T> func, T &data, const std::string &name = "thread-none"): _data(data), _func(func), _threadname(name){int n = pthread_create(&_tid, nullptr, threadroutine, this);assert(!n);}// 调用顺序:Start->threadroutine->Execute->_funcstatic void *threadroutine(void *args){Thread<T> *self = static_cast<Thread<T> *>(args);self->Execute();return nullptr;}// 线程分离void Detach(){int n = pthread_detach(_tid);assert(!n);}// 线程等待void Join(){int n = pthread_join(_tid, nullptr);assert(!n);}std::string name(){return _threadname;}~Thread(){}protected:pthread_t _tid;std::string _threadname;T &_data;func_t<T> _func;
};
- Thread类构造函数完成的工作是创建新线程+初始化数据+让线程执行特定任务
阻塞队列
生产者和消费者的交易场所是特定的数据结构,我们采用的是阻塞队列
- 阻塞队列本质是一个队列,阻塞队列是队列的再封装
- 阻塞队列要求的是若队列已满,那么生产者(图上Thread 1)不能再向队列中输入数据,需要等待消费者(图上Thread 2)消费数据
- 阻塞队列要求的是若队列为空,那么消费者(图上Thread 2)不能再从队列中拿取数据,需要等待生产者(图上Thread 1)生产数据
- 阻塞队列是临界资源,因为生产者和消费者看到的是同一个阻塞队列,这就要求我们要对这个阻塞队列进行保护
- 为了维护生产者和消费者之间必须的互斥关系,所以生产者和消费者共用一把互斥锁
阻塞队列的框架
template <class T>
class BlockQueue
{
private://类里面使用的接口bool IsFull();//判断阻塞队列是否为满bool IsEmpty();//判断阻塞队列是否为空
public:BlockQueue(int cap);//构造函数void Enqueue(T &in);//生产者用的接口void Pop(T *out); // 消费者用的接口~BlockQueue(int cap);//析构函数protected:std::queue<T> _block_queue; // 阻塞队列int _cap; // 阻塞队列的大小pthread_mutex_t _mutex; // 用于保护阻塞队列pthread_cond_t _productor; // 专门给生产者的条件变量pthread_cond_t _consumer; // 专门给消费者的条件变量
};
- _cap主要是用于限制生产者的生产速度,当阻塞队列中的数据超过_cap时,生产者会阻塞等待,等待消费者消费
- 专门给生产者/消费者提供一个条件变量的原因是不需要再区分条件变量下的线程到底是生产者还是消费者,减少了代码的复杂度
- 生产者和消费者是互斥关系,所以要求生产和消费共用一把互斥锁_mutex
阻塞队列的构造函数
BlockQueue(int cap) : _cap(cap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_productor,nullptr);pthread_cond_init(&_consumer,nullptr);}
阻塞队列的析构函数
~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor);pthread_cond_destroy(&_consumer);}
单生产单消费阻塞队列中生产者接口
生产者负责向阻塞队列中输入数据,它的动作如下
- 生产者生产数据之前需要保证生产者和消费者之间的互斥关系,保护临界资源,所以生产者向阻塞队列插入数据时需要加锁,生产者生产完数据后需要解锁。
- 生产者生产数据之前要判定当前阻塞队列是否满了,满了需要去生产者条件变量下等待,没满允许生产数据
- 生产者生产完一个数据后需要唤醒消费者进行消费
bool IsFull(){return _block_queue.size() == _cap;}void Enqueue(T &in) // 生产者用的接口{pthread_mutex_lock(&_mutex);//加锁保证生产者和生产者,生产者和消费者之间的互斥关系//临界区if(IsFull())//BUG{//满了,就放到生产者条件变量指定的等待队列中等待//等待时记得释放锁pthread_cond_wait(&_productor,&_mutex);}//没满,插入数据后唤醒消费者消费_block_queue.push(in);pthread_cond_signal(&_consumer);pthread_mutex_unlock(&_mutex);}
- 上述代码仅适用于单生产者单消费者的情况,否则会出现BUG
单生产单消费阻塞队列中消费者接口
消费者负责从阻塞队列中读取数据,它的动作如下
- 消费者消费数据之前需要保证生产者和消费者之间的互斥关系,保护临界资源,所以消费者从阻塞队列中读取数据时需要加锁,消费者消费完数据后需要解锁。
- 消费者消费数据之前要判定当前阻塞队列是否有数据,没有则需要去消费者条件变量下等待,有允许消费数据
- 消费者消费完一个数据后需要唤醒生产者进行生产
bool IsEmpty(){return _block_queue.size() == 0;}void Pop(T *out) // 消费者用的接口{pthread_mutex_lock(&_mutex);//加锁保证消费者和消费者,生产者和消费者之间的互斥关系//临界区if(IsEmpty())//BUG{//没有数据可以消费,在消费者条件变量下进行等待pthread_cond_wait(&_consumer,&_mutex);}//有数据,消费数据后唤醒生产者生产数据*out = _block_queue.front();_block_queue.pop();pthread_cond_signal(&_productor);pthread_mutex_unlock(&_mutex);}
- 上述代码仅适用于单生产者单消费者的情况,否则会出现BUG
伪唤醒
上述代码只要出现多个生产者或者多个消费者时都会出现伪唤醒的情况
以5个消费者,1个生产者为例:
- 刚开始由于阻塞队列中没有数据,那么5个消费者都会在条件变量下进行等待
- 生产者生产了一个数据,并假设生产者调用的是broadcast唤醒了消费者条件变量下的所有线程,那么会有一个线程竞争到_mutex锁并执行后续代码,此时其他线程也被broadcast唤醒了所以没在条件变量中等待,而是继续竞争锁
- 当第一个竞争到锁的线程消费结束,并解开了锁,那么其他四个线程中有一个竞争锁成功,继续执行后续代码,但生产者只生产了一个数据并且已经被消费了,此时阻塞队列为空,这个线程又去消费数据,进程就会崩溃
- 对于这种情况,我们称之为线程被伪唤醒了
实际上当生产者或消费者的唤醒频率不一致时都可能出现伪唤醒的情况,这个问题产生的本质原因是因为线程是在if代码块中等待的原因,所以只要我们把if语句换为while判断就可以解决这个问题
多生产多消费者阻塞队列的生产消费接口
void Enqueue(T &in) // 生产者用的接口{pthread_mutex_lock(&_mutex);//加锁保证生产者和生产者,生产者和消费者之间的互斥关系//临界区while(IsFull()){//满了,就放到生产者条件变量指定的等待队列中等待//等待时记得释放锁pthread_cond_wait(&_productor,&_mutex);}//没满,插入数据后唤醒消费者消费_block_queue.push(in);pthread_cond_signal(&_consumer);pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者用的接口{pthread_mutex_lock(&_mutex);//加锁保证消费者和消费者,生产者和消费者之间的互斥关系//临界区while(IsEmpty()){//没有数据可以消费,在消费者条件变量下进行等待pthread_cond_wait(&_consumer,&_mutex);}//有数据,消费数据后唤醒生产者生产数据*out = _block_queue.front();_block_queue.pop();pthread_cond_signal(&_productor);pthread_mutex_unlock(&_mutex);}
代码调优
阻塞队列中可以记录一下条件变量中的线程个数,根据个数来判断是否要唤醒线程,因为当生产者和消费者唤醒频率不一致时经常会出现唤醒一个空的条件变量,这实际上是无意义的
阻塞队列总结代码:
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
private:// 类里面使用的接口bool IsFull(){return _block_queue.size() == _cap;}bool IsEmpty(){return _block_queue.size() == 0;}public:BlockQueue(int cap) : _cap(cap), _consumer_wait_num(0), _productor_wait_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_productor, nullptr);pthread_cond_init(&_consumer, nullptr);}void Enqueue(T &in) // 生产者用的接口{pthread_mutex_lock(&_mutex);// 加锁保证生产者和生产者,生产者和消费者之间的互斥关系// 临界区while (IsFull()){// 满了,就放到生产者条件变量指定的等待队列中等待// 等待时记得释放锁_productor_wait_num++;pthread_cond_wait(&_productor, &_mutex);_productor_wait_num--;}// 没满,插入数据后唤醒消费者消费_block_queue.push(in);if (_consumer_wait_num > 0)pthread_cond_signal(&_consumer);pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者用的接口{pthread_mutex_lock(&_mutex);// 加锁保证消费者和消费者,生产者和消费者之间的互斥关系// 临界区while (IsEmpty()){// 没有数据可以消费,在消费者条件变量下进行等待_consumer_wait_num++;pthread_cond_wait(&_consumer, &_mutex);_consumer_wait_num--;}// 有数据,消费数据后唤醒生产者生产数据*out = _block_queue.front();_block_queue.pop();if (_productor_wait_num > 0)pthread_cond_signal(&_productor);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor);pthread_cond_destroy(&_consumer);}
protected:std::queue<T> _block_queue; // 阻塞队列int _cap; // 阻塞队列的大小pthread_mutex_t _mutex; // 用于保护阻塞队列pthread_cond_t _productor; // 专门给生产者的条件变量pthread_cond_t _consumer; // 专门给消费者的条件变量int _productor_wait_num;int _consumer_wait_num;
};
生产消费模型的调用代码
主要完成的工作如下:
- 控制生产者、消费者的个数
- 管理所有的生产者和消费者
- 创建一个阻塞队列,并让所有的生产者和消费者看到同一个阻塞队列
- 启动生产者和消费者
- 生产者生产数据,消费者消费数据
- 等待生产者和消费者
#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
void productor(BlockQueue<int> &bq)//生产者执行的工作
{while (true){int data = rand()%10;bq.Enqueue(data);std::cout << "enqueue seccess! data : " << data << std::endl;}
}
void consumer(BlockQueue<int> &bq)//消费者执行的工作
{int data;while(true){bq.Pop(&data);std::cout << "Pop success! data : " << data << std::endl;sleep(1);}
}
void StartCommon(func_t<BlockQueue<int>> func, int num, std::vector<Thread<BlockQueue<int>>> &threads, BlockQueue<int> &bq)
{for (int i = 0; i < num; ++i){std::string name = "thread-" + std::to_string(i + 1);threads.emplace_back(func, bq, name);sleep(1);}
}
void StartConsumer(int num, std::vector<Thread<BlockQueue<int>>> &threads, BlockQueue<int> &bq)
{StartCommon(consumer, num, threads, bq);
}
void StartProductor(int num, std::vector<Thread<BlockQueue<int>>> &threads, BlockQueue<int> &bq)
{StartCommon(productor, num, threads, bq);
}
void WaitAll(std::vector<Thread<BlockQueue<int>>> &threads)
{for (auto &thread : threads){thread.Join();}
}int main()
{BlockQueue<int> *_block = new BlockQueue<int>(10);//后续生产者和消费者看到的同一个阻塞队列int productor_num = 3; // 生产者个数int consumer_num = 5; //消费者个数std::vector<Thread<BlockQueue<int>>> threads;//管理所有的生产者和消费者StartProductor(productor_num, threads, *_block);//启动所有的生产者StartConsumer(consumer_num, threads,*_block);//启动所有的消费者WaitAll(threads);//等待所有线程return 0;
}
基于环形队列的生产消费模型
环形队列
- 环形队列也是一个可以供生产者和消费者交易的场所
- 环形队列本质是一个数组,通过控制下标逻辑抽象成环形队列
- 环形队列需要配合信号量进行使用,信号量描述的资源主要是两个,对于生产者来说关心的是空间资源,对于消费者来说关心的是数据资源。所以需要两个信号量
当生产者和消费者在环形队列中进行交易时,会出现两种情况:
- 生产者和消费者在同一位置
- 生产者和消费者不在同一个位置
首先是生产者和消费者在同一位置
- 若生产者和消费者在同一个位置,那么第一种可能就是空间信号量为0
- 若生产者和消费者在同一个位置,那么第二种可能就是数据信号量为0
- 当空间信号量为0时,不允许生产者继续生产数据,需要阻塞等待消费者消费数据,消费者消费完一个数据以后需要对空间信号量进行v操作,用于唤醒生产者继续生产数据
- 当数据信号量为0时,不允许消费者继续消费数据,需要阻塞等待生产者生产数据,生产者生产完一个数据以后需要对数据信号量进行v操作,用于唤醒消费者继续消费数据、
- 总得来说,生产者和消费者在同一位置要保证它们的同步关系,这个同步关系是由信号量来维护的
生产者和消费者不在同一位置
- 若生产者和消费者不在同一位置,则空间信号量和数据信号量都一定不为0,也就是环形队列既不为空,也不为满。
- 生产者在生产数据的同时消费者可以并发消费数据,因为它们指向的位置不是同一个,互相之间互不影响
单生产单消费模型的环形队列
单生产单消费模型的环形队列只需要维护生产者和消费者之间的互斥和同步关系,这一点信号量已经帮我们做到了
框架
#include <vector>
#include <semaphore.h>
template <class T>
class RingQueue
{private:void P(sem_t* sem);//sem信号量++void V(sem_t* sem)//sem信号量--
public:RingQueue(int cap);//构造函数void Enqueue(T &in); // 生产者生产接口void Pop(T& out);//消费者消费接口~RingQueue();//析构函数protected:// 数据结构std::vector<T> _ring_queue; // 环形队列int _cap; // 环形队列的总大小// 生产消费所处的下标int _productor_step;int _consumer_step;// 信号量sem_t _space_sem; // 空间信号量sem_t _data_sem; // 数据信号量
};
构造函数
RingQueue(int cap) : _cap(cap), _ring_queue(cap), _productor_step(0), _consumer_step(0){sem_init(&_space_sem, 0, cap); // 一开始空间资源是满的sem_init(&_data_sem, 0, 0); // 一开始没有数据}
析构函数
~RingQueue(){sem_destroy(&_space_sem);sem_destroy(&_data_sem);}
信号量pv操作封装接口
void P(sem_t *sem){sem_wait(sem);}void V(sem_t *sem){sem_post(sem);}
生产者接口
生产者生产数据时,需要完成的动作顺序:
- 预定空间资源,没有就阻塞,有才能向下执行
- 生产数据到环形队列中
- 生产成功后,环形队列中多了一个数据,需要对数据信号量++
void Enqueue(T &in) // 生产者生产接口{// 先预定一个空间资源,空间信号量p操作P(&_space_sem);//P操作成功一定有空间可以生产且生产者和消费者的下标不相同_ring_queue[_productor_step++] = in;//生产成功后,保证环形队列的特性且对数据信号量V操作_productor_step %= _cap;V(&_data_sem);}
消费者接口
消费者消费数据时,需要完成的动作顺序:
- 预定数据资源,没有就阻塞,有才能向下执行
- 消费者从环形队列中拿取数据消费
- 消费成功后,环形队列中多了一个空间,需要对空间信号量++
void Pop(T& out){//先预定一个数据资源,数据信号量p操作P(&_data_sem);//P操作成功一定有数据可以消费且生产者和消费者的下标不相同out = _ring_queue[_consumer_step++];//消费成功后,保证环形队列的特性且对空间信号量V操作_consumer_step %= _cap;V(&_space_sem);}
多生产多消费模型的环形队列
单生产单消费模型中仅仅维护了生产者和消费者之间的互斥和同步关系,并没有维护生产者和生产者之间的互斥关系、消费者与消费者之间的互斥关系
- 建议使用两把锁来维护。因为生产者和消费者不指向环形队列中的同一个位置时,生产者可以与消费者并发执行工作,如果只使用一把锁会导致生产者和消费者无法并发执行
- 建议加锁顺序是先申请信号量,再申请锁。 因为如果是先申请锁,那么只有1个线程会获取锁,它获得了锁还要申请信号量,其他线程处于闲置状态。如果是先申请信号量再申请锁,那么信号量可以由多个线程获取,哪怕线程没有竞争成功锁,它之后也不需要申请信号量。
- 加锁顺序是先加锁后申请信号量就好比电影院中先排队再买票一样,效率不足,但也没错
框架
#include <vector>
#include <semaphore.h>
#include <pthread.h>
template <class T>
class RingQueue
{
private:void P(sem_t *sem)//信号量P操作{sem_wait(sem);}void V(sem_t *sem)//信号量V操作{sem_post(sem);}public:RingQueue(int cap);//构造函数void Enqueue(T &in) // 生产者生产接口void Pop(T &out);// 消费者消费接口~RingQueue();//析构函数protected:// 数据结构std::vector<T> _ring_queue; // 环形队列int _cap; // 环形队列的总大小// 生产消费所处的下标int _productor_step;int _consumer_step;// 信号量sem_t _space_sem; // 空间信号量sem_t _data_sem; // 数据信号量// 互斥锁pthread_mutex_t _productor_mutex;//多生产者使用的互斥锁pthread_mutex_t _consumer_mutex;//多消费者使用的互斥锁
};
构造函数
RingQueue(int cap) : _cap(cap), _ring_queue(cap), _productor_step(0), _consumer_step(0){sem_init(&_space_sem, 0, cap); // 一开始空间资源是满的sem_init(&_data_sem, 0, 0); // 一开始没有数据pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}
析构函数
~RingQueue(){sem_destroy(&_space_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}
生产者生产接口
生产者生产数据时,需要完成的动作顺序:
- 预定空间资源,没有就阻塞,有才能向下执行
- 申请锁,维护多生产者之间的互斥关系
- 生产数据到环形队列中
- 生产成功后,环形队列中多了一个数据,需要对数据信号量++
- 解锁
void Enqueue(T &in) // 生产者生产接口{// 先预定一个空间资源,空间信号量p操作P(&_space_sem);pthread_mutex_lock(&_productor_mutex);// P操作成功一定有空间可以消费且生产者和消费者的下标不相同_ring_queue[_productor_step++] = in;// 生产成功后,保证环形队列的特性且对数据信号量V操作_productor_step %= _cap;pthread_mutex_unlock(&_productor_mutex);V(&_data_sem);}
消费者消费接口
消费者消费数据时,需要完成的动作顺序:
- 预定数据资源,没有就阻塞,有才能向下执行
- 申请锁,维护多消费者之间的互斥关系
- 消费者从环形队列中拿取数据消费
- 消费成功后,环形队列中多了一个空间,需要对空间信号量++
- 解锁
void Pop(T &out){// 先预定一个数据资源,数据信号量p操作P(&_data_sem);pthread_mutex_lock(&_consumer_mutex);// P操作成功一定有数据可以消费且生产者和消费者的下标不相同out = _ring_queue[_consumer_step++];// 消费成功后,保证环形队列的特性且对空间信号量V操作_consumer_step %= _cap;pthread_mutex_unlock(&_consumer_mutex);V(&_space_sem);}
生产消费模型的并发意义
以基于阻塞队列的生产消费模型为例:
- 首先要知道的一点是虽然我们上面实现的阻塞队列中存储的是整形数据,但阻塞队列不仅仅可以放入整形数据,也能放入一个一个的任务
- 生产者和消费者在交易场所进行交易时没有提供并发,因为同一时间只允许一个线程访问这个交易场所
- 生产消费模型的并发在于消费者不是只能等待交易场所中的任务的,消费者可能也在处理任务,若有5个消费者,当3个消费者在处理任务,两个消费者在等待任务时,此时的这3个消费者就实现了并发处理,这也就是生产消费模型的意义,它提供了高并发,提高了效率!