Linux操作系统7- 线程同步与互斥4(基于POSIX条件变量的生产者消费者模型)
上篇文章:Linux操作系统7- 线程同步与互斥3(POSIX条件变量的使用,线程循环打印数字)-CSDN博客
本篇代码仓库:myLerningCode/l31 · 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com)
目录
一. 生产者消费者模型分析
1.1 三种角色
1.2 生产者消费者模型的特点
二. BlockQueue生产者消费者模型
编辑
2.1 成员变量 ⭐
2.2 构造与析构函数
2.3 辅助函数
2.4 push 与 pop ⭐
三. 测试代码
3.1 不控制生产者生产,消费者消费
3.2 只控制生产者生产
3.2 只控制消费者消费
四. 代码细节与总结
4.1 pthread_cond_wait
一. 生产者消费者模型分析
1.1 三种角色
生产者消费者模型中有三个角色生产者,消费者,以及生产者消费者的交易场所。 这个模型非常类似于生活中的超市,我们作为消费者从超市购买物品,而厂商作为生产者生产物品。
生产者生产数据然后传递给交易场所,如果发现交易场所满了,则阻塞等待
消费者从交易场所拿取数据消费,如果发现交易场所为空,则阻塞等待
交易场所作为临时保存数据的场所,是生产消费的缓冲区
1.2 生产者消费者模型的特点
根据上面的分析,可以总结出三个角色之间的关系。生产者有1个或者多个,消费者有1个或者多个。但是所有的数据都是不同的。
1 消费者与消费者之间:互斥竞争关系,我不能消费你正在消费的数据
2 生产者与生产者之间:互斥竞争关系,我不能生产你正在生产的数据
3 消费者与生产者之间:互斥(一个时间点只有一个消费者或者生产者访问交易场所,其他人只能等待)与同步(消费者消费数据后通知生产者生产,生产者生产数据后通知消费者消费)关系
生产者消费者模型可以将生产者和消费者解耦,让二者不会受到对方的影响,关注于自己的事情。
还能通过交易场所缓冲数据,以便支持消费者生产者之间忙闲不均的问题(有时候生产者生产数据过快,有时候消费者消费数据过快)从而提高整体的效率
二. BlockQueue生产者消费者模型
这里使用BlockQueue(阻塞队列)来实现生产者消费者模型。其中阻塞队列是生产者消费者交易的场所。
作为交易场所BlockQueue需要提供下面的基础功能:
1 通过互斥锁保证交易场所数据的安全
2 为生产者提供放入数据的能力,为消费者提供消费的能力
3 通过条件变量保证生产者消费者之间的同步
2.1 成员变量 ⭐
通过上面的分析,成员变量至少需要 互斥锁 两个条件变量 一个队列。
代码如下:
#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的默认容量template <class T>
class BlockQueue
{
private:std::queue<T> _queue; // 阻塞队列size_t _maxnum; // 队列最大容量pthread_mutex_t _mtx; // 互斥锁pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};
2.2 构造与析构函数
需要在构造函数中完成锁与条件变量的初始化。析构中销毁锁与条件变量
BlockQueue(int maxnum = gnum) : _maxnum(maxnum){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}
2.3 辅助函数
由于每一次生产消费数据都需要判断阻塞队列是否为满或者空。所以需要两个函数来帮助我们快速获取阻塞队列的信息。
//不想让外部调用这些代码,设置为私有
private:bool is_empty(){return _queue.empty();}bool is_full(){return _queue.size() == _maxnum;}
2.4 push 与 pop ⭐
这两个函数是阻塞队列生产者消费模型的核心函数。生产生产数据之后通过push向阻塞队列中放入数据,消费者从阻塞队列中拿取数据然后消费。
// 生产者生产数据void push(const T &in){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否满足生产while (is_full()){// 数据满了生产者等待消费者消费pthread_cond_wait(&_pcond, &_mtx);}// 生产数据_queue.push(in);// 队列不为空,通知消费者消费pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_mtx);}// 消费这消费数据,通过输入输出型参数获取数据void pop(T *out){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否可以消费数据while (is_empty()){// 等待生产者生产数据pthread_cond_wait(&_ccond, &_mtx);}// 开始消费数据*out = _queue.front();_queue.pop();// 队列不满,通知生产者生产数据pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_mtx);}
2.5 BlockQueue.hpp
阻塞队列的全部代码如下:
#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的最大容量template <class T>
class BlockQueue
{
public:BlockQueue(int maxnum = gnum) : _maxnum(maxnum){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}// 生产者生产数据void push(const T &in){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否满足生产while (is_full()){// 数据满了生产者等待消费者消费pthread_cond_wait(&_pcond, &_mtx);}// 生产数据_queue.push(in);// 队列不为空,通知消费者消费pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_mtx);}// 消费这消费数据,通过输入输出型参数获取数据void pop(T *out){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否可以消费数据while (is_empty()){// 等待生产者生产数据pthread_cond_wait(&_ccond, &_mtx);}// 开始消费数据*out = _queue.front();_queue.pop();// 队列不满,通知生产者生产数据pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_mtx);}private:bool is_empty(){return _queue.empty();}bool is_full(){return _queue.size() == _maxnum;}private:std::queue<T> _queue; // 阻塞队列size_t _maxnum; // 队列最大容量pthread_mutex_t _mtx; // 互斥锁pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};
三. 测试代码
创建一个线程用于生产,一个线程用于消费。生产线程向队列生产一个随机数,而消费线程从队列中拿取数据并打印输出。
3.1 不控制生产者生产,消费者消费
测试代码如下:
#include <iostream>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){int tmp = rand() % 10000 + 1;bq->push(tmp);std::cout << "生产者生产数据:" << tmp << std::endl;}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){int tmp = 0;bq->pop(&tmp);std::cout << "消费者获取数据:" << tmp << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义生产消费线程与阻塞队列pthread_t p;pthread_t c;BlockQueue<int> *bq = new BlockQueue<int>();pthread_create(&p, nullptr, producer, (void *)bq);pthread_create(&c, nullptr, consumer, (void *)bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;bq = nullptr;return 0;
}
测试结果如下:
可以看到,生产者疯狂生产数据,消费者疯狂消费数据
3.2 只控制生产者生产
让生产者线程生产一次就调用sleep(1);
void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){sleep(1);int tmp = rand() % 10000 + 1;bq->push(tmp);std::cout << "生产者生产数据:" << tmp << std::endl;}return nullptr;
}
可以看到,生产者生产数据之后消费者才去消费数据。当生产者sleep的时候,消费者发现阻塞队列为空,此时就会阻塞自己,等待生产者生产数据
3.2 只控制消费者消费
消费者每隔一秒才去消费数据。
void *consumer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){sleep(1);int tmp = 0;bq->pop(&tmp);std::cout << "消费者获取数据:" << tmp << std::endl;}return nullptr;
}
运行结果如下:
可以看到:由于生产者不受控制,一开始就生产很多数据。由于消费者每隔一秒才消费数据,所以会先消费生产者之前生产的数据
四. 代码细节与总结
通过上面的测试:可以直到,通过生产者消费者模型可以实现线程之间的同步与互斥。还能通过控制生产者消费者生产消费的速度来调整双方的同步。
4.1 pthread_cond_wait
生产者生产数据的时候,消费者消费数据的时候都是处于临界区。这时候生产者或消费者都是带着锁的。
为什么生产者带上锁阻塞,消费者还能消费?消费者上锁阻塞,生产者还能生存?
这是因为:pthread_cond_wait有第二个参数,被调用的时候。会以原子性将我们的锁释放,然后阻塞自己。
当wait被唤醒之后,同样以原子性再次加锁。
如果多个生产者或者消费者被同时唤醒,此时pthread_condd_wait会不会调用失败?
假设10个生产者都满足if_full 进入wait。然后同时唤醒,如果我们只使用if进行判断的话,就会导致10个生存者被唤醒后就都去插入数据了
所以需要使用while循环去判断条件是否满足,即便唤醒了,也要再次判断条件满足。因为有可能你刚唤醒,就有其他生产者生产了数据!