当前位置: 首页 > news >正文

Linux操作系统7- 线程同步与互斥4(基于POSIX条件变量的生产者消费者模型)

上篇文章:Linux操作系统7- 线程同步与互斥3(POSIX条件变量的使用,线程循环打印数字)-CSDN博客

本篇代码仓库:myLerningCode/l31 · 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com)

目录

一. 生产者消费者模型分析

1.1 三种角色

1.2 生产者消费者模型的特点

二. BlockQueue生产者消费者模型

​编辑

2.1 成员变量 ⭐

2.2 构造与析构函数 

2.3 辅助函数 

2.4 push 与 pop ⭐

三. 测试代码 

3.1 不控制生产者生产,消费者消费

3.2 只控制生产者生产

3.2 只控制消费者消费

四. 代码细节与总结

4.1 pthread_cond_wait


一. 生产者消费者模型分析

1.1 三种角色

        生产者消费者模型中有三个角色生产者,消费者,以及生产者消费者的交易场所。                          这个模型非常类似于生活中的超市,我们作为消费者从超市购买物品,而厂商作为生产者生产物品。

生产者生产数据然后传递给交易场所,如果发现交易场所满了,则阻塞等待

消费者从交易场所拿取数据消费,如果发现交易场所为空,则阻塞等待

交易场所作为临时保存数据的场所,是生产消费的缓冲区

1.2 生产者消费者模型的特点

        根据上面的分析,可以总结出三个角色之间的关系。生产者有1个或者多个,消费者有1个或者多个。但是所有的数据都是不同的。

1 消费者与消费者之间:互斥竞争关系,我不能消费你正在消费的数据

2 生产者与生产者之间:互斥竞争关系,我不能生产你正在生产的数据

3 消费者与生产者之间:互斥(一个时间点只有一个消费者或者生产者访问交易场所,其他人只能等待)与同步(消费者消费数据后通知生产者生产,生产者生产数据后通知消费者消费)关系

        生产者消费者模型可以将生产者和消费者解耦,让二者不会受到对方的影响,关注于自己的事情。 

        还能通过交易场所缓冲数据,以便支持消费者生产者之间忙闲不均的问题(有时候生产者生产数据过快,有时候消费者消费数据过快)从而提高整体的效率

二. BlockQueue生产者消费者模型

        这里使用BlockQueue(阻塞队列)来实现生产者消费者模型。其中阻塞队列是生产者消费者交易的场所。

        作为交易场所BlockQueue需要提供下面的基础功能:

1 通过互斥锁保证交易场所数据的安全

2 为生产者提供放入数据的能力,为消费者提供消费的能力

3 通过条件变量保证生产者消费者之间的同步

2.1 成员变量 ⭐

        通过上面的分析,成员变量至少需要 互斥锁 两个条件变量 一个队列。

代码如下:

#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的默认容量template <class T>
class BlockQueue
{
private:std::queue<T> _queue;  // 阻塞队列size_t _maxnum;        // 队列最大容量pthread_mutex_t _mtx;  // 互斥锁pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};

2.2 构造与析构函数 

        需要在构造函数中完成锁与条件变量的初始化。析构中销毁锁与条件变量

    BlockQueue(int maxnum = gnum) : _maxnum(maxnum){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}

2.3 辅助函数 

        由于每一次生产消费数据都需要判断阻塞队列是否为满或者空。所以需要两个函数来帮助我们快速获取阻塞队列的信息。

//不想让外部调用这些代码,设置为私有
private:bool is_empty(){return _queue.empty();}bool is_full(){return _queue.size() == _maxnum;}

2.4 push 与 pop ⭐

        这两个函数是阻塞队列生产者消费模型的核心函数。生产生产数据之后通过push向阻塞队列中放入数据,消费者从阻塞队列中拿取数据然后消费。

    // 生产者生产数据void push(const T &in){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否满足生产while (is_full()){// 数据满了生产者等待消费者消费pthread_cond_wait(&_pcond, &_mtx);}// 生产数据_queue.push(in);// 队列不为空,通知消费者消费pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_mtx);}// 消费这消费数据,通过输入输出型参数获取数据void pop(T *out){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否可以消费数据while (is_empty()){// 等待生产者生产数据pthread_cond_wait(&_ccond, &_mtx);}// 开始消费数据*out = _queue.front();_queue.pop();// 队列不满,通知生产者生产数据pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_mtx);}

2.5 BlockQueue.hpp

        阻塞队列的全部代码如下:

#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的最大容量template <class T>
class BlockQueue
{
public:BlockQueue(int maxnum = gnum) : _maxnum(maxnum){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}// 生产者生产数据void push(const T &in){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否满足生产while (is_full()){// 数据满了生产者等待消费者消费pthread_cond_wait(&_pcond, &_mtx);}// 生产数据_queue.push(in);// 队列不为空,通知消费者消费pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_mtx);}// 消费这消费数据,通过输入输出型参数获取数据void pop(T *out){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否可以消费数据while (is_empty()){// 等待生产者生产数据pthread_cond_wait(&_ccond, &_mtx);}// 开始消费数据*out = _queue.front();_queue.pop();// 队列不满,通知生产者生产数据pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_mtx);}private:bool is_empty(){return _queue.empty();}bool is_full(){return _queue.size() == _maxnum;}private:std::queue<T> _queue;  // 阻塞队列size_t _maxnum;        // 队列最大容量pthread_mutex_t _mtx;  // 互斥锁pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};

三. 测试代码 

        创建一个线程用于生产,一个线程用于消费。生产线程向队列生产一个随机数,而消费线程从队列中拿取数据并打印输出。

3.1 不控制生产者生产,消费者消费

测试代码如下:

#include <iostream>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){int tmp = rand() % 10000 + 1;bq->push(tmp);std::cout << "生产者生产数据:" << tmp << std::endl;}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){int tmp = 0;bq->pop(&tmp);std::cout << "消费者获取数据:" << tmp << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义生产消费线程与阻塞队列pthread_t p;pthread_t c;BlockQueue<int> *bq = new BlockQueue<int>();pthread_create(&p, nullptr, producer, (void *)bq);pthread_create(&c, nullptr, consumer, (void *)bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;bq = nullptr;return 0;
}

测试结果如下:

可以看到,生产者疯狂生产数据,消费者疯狂消费数据

3.2 只控制生产者生产

        让生产者线程生产一次就调用sleep(1);

void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){sleep(1);int tmp = rand() % 10000 + 1;bq->push(tmp);std::cout << "生产者生产数据:" << tmp << std::endl;}return nullptr;
}

        可以看到,生产者生产数据之后消费者才去消费数据。当生产者sleep的时候,消费者发现阻塞队列为空,此时就会阻塞自己,等待生产者生产数据

3.2 只控制消费者消费

        消费者每隔一秒才去消费数据。

void *consumer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){sleep(1);int tmp = 0;bq->pop(&tmp);std::cout << "消费者获取数据:" << tmp << std::endl;}return nullptr;
}

    运行结果如下:

        可以看到:由于生产者不受控制,一开始就生产很多数据。由于消费者每隔一秒才消费数据,所以会先消费生产者之前生产的数据

四. 代码细节与总结

        通过上面的测试:可以直到,通过生产者消费者模型可以实现线程之间的同步与互斥。还能通过控制生产者消费者生产消费的速度来调整双方的同步。

4.1 pthread_cond_wait

        生产者生产数据的时候,消费者消费数据的时候都是处于临界区。这时候生产者或消费者都是带着锁的。

        为什么生产者带上锁阻塞,消费者还能消费?消费者上锁阻塞,生产者还能生存?

         这是因为:pthread_cond_wait有第二个参数,被调用的时候。会以原子性将我们的锁释放,然后阻塞自己。

        当wait被唤醒之后,同样以原子性再次加锁。

        如果多个生产者或者消费者被同时唤醒,此时pthread_condd_wait会不会调用失败?

        假设10个生产者都满足if_full 进入wait。然后同时唤醒,如果我们只使用if进行判断的话,就会导致10个生存者被唤醒后就都去插入数据了

        所以需要使用while循环去判断条件是否满足,即便唤醒了,也要再次判断条件满足。因为有可能你刚唤醒,就有其他生产者生产了数据!


http://www.mrgr.cn/news/95433.html

相关文章:

  • 第二天 开始Unity Shader的学习之旅之熟悉顶点着色器和片元着色器
  • 基于大模型的甲状舌管囊肿全流程预测与临床方案研究报告
  • 2025海外华文新媒体高级人才研修班在广西南宁举办
  • 代码随想录算法训练营第十四天(2)|151.翻转字符串里的单词
  • WSL 导入完整系统包教程
  • 使用brower use AI 代理自动控制浏览器完成任务
  • 【Java篇】静动交融,内外有别:从静态方法到内部类的深度解析
  • 网络HTTPS协议
  • SOFABoot-09-模块隔离
  • 2025知识图谱峰会(脱敏)PPT合集(18份).zip
  • LintCode第1712题 - 和相同的二元子数组
  • 3月22日星期六今日早报简报微语报早读
  • 回调方法传参汇总
  • 一文详解响应式编程springwebflux
  • C++函数与STL
  • c#知识点补充4
  • SSH密钥认证 + 文件系统权限控制 + Git仓库配置+封存与解封GIT仓库
  • 【Git流程最佳实践】 开发较大功能时应使用project branch
  • 基于CNN的FashionMNIST数据集识别5——GoogleNet模型
  • Linux_进程概念(B)-环境变量进程地址空间【Linux】