当前位置: 首页 > news >正文

Linux:多线程中的生产消费模型

多线程

  • 生产消费模型
  • 三种关系
  • 两个角色
  • 一个交易场所
  • 交易场所的实现(阻塞队列)
    • pthread_cond_wait 接口
    • 判断阻塞队列的空或满时,需要使用while
    • 测试一:单消费单生产案例
    • 测试二:多生产多消费案例

生产消费模型

消费者与生产者这两个词是不是很熟悉?

生产者、消费者、分解者 … 这些词在中学生物课本中经常看到。但是,接下来所介绍的内容中没有分解者,只有生产者与消费者。

生产消费模型在日常生活中很常见,这里来举个比较贴切的案例:超市

在超市内部会存有大量的商品,这些商品提供给用户消费;超市并不是生产者,超市只是将多种商品汇聚到指定的柜台方便用户们选取。超市中的商品来自与各个供货商,供货商生产产品。

用户、超市、供货商中,用户是消费者;供货商是生产者;

那么,超市在这里担任一种什么角色呢?

先来想这样的一个问题,用户为什么去超市买商品,而不是直接去供货商那边去买呢?

首先,工厂在生产一种产品时,是批量生产的。并且在工厂中,不是单单只是生产一种产品。可能今天生产某个类型的产品,当数量达到甲方需求后就会停止生产,换成另一类的商品继续生产。如果,一个用户当天正好需要这个工厂的某个商品时,这个工厂正好生产着这个商品,这样是再好不过。如果,当天这个工厂恰好就没有生产这个商品呢?用户一直等待工厂来生产吗?这样的体验是很差的。

由此,超市的作用就是将各个供应商的商品汇总起来。什么商品我都收纳一些,不管当下商品卖不卖得出去,时间到了自然有用户需求。当用户需要某个商品时,不用说每次都要去供应商去询问等待,直接去超市购买即可。超市就起到了商品供应的缓冲作用。

用户、供应商、超市组成的的生产消费模型,允许生产与消费的工作步调不一致。供货商每时每刻都可以生产大批的商品汇总到超市,消费者在不同的时间点,都可以从超市中获取不同的商品需求,不需要时间的等待。

总的来说,超市的生产消费模式是高效的,允许忙闲不均。

铺垫了这么多,将上面提到的角色转换一下:

生产者与消费者 就是所谓的 多线程
超市是交易场所,相当于一种特定的缓冲区(这个缓冲区可以是:队列、链表、哈希表… 等数据结构);
商品就是每个线程所需处理的数据

再将上面的角色关联一下,缓冲区、数据、多线程的之间的主要是为了完成一种功能:完成多线程之间的通信

所以,生产消费模型主要用于:不同线程之间完成通信工作

为了让多线程进行通信,就要让超市(交易场所)被所有的线程看到。注定了,交易场所是一个被多线程并发访问的公共场所。所以,交易场所就需要被保护起来;还要进行维护对应的互斥与同步的关系!(避免饥饿的发生)

三种关系

在生产消费模型中存在三种关系:

  1. 生产者与生产者之间处于:互斥关系

好比 可口可乐 与 百事可乐 两家厂商,都是生产者。超市中的货架摆放商品的容量是有限的。容量很大时还好,当容量很紧张时,可口可乐供货说来摆我的商品,百事可乐的供应商也说摆我的摆我的,不同的供应商之间就会出现竞争关系。所以,生产者与生产者之间是属于 互斥关系

  1. 消费者与消费者之间处于:互斥关系

超市中的商品是有限的。当超市中的产品很紧缺,不同消费者之间又是同时需要这个产品时,就会出现竞争现象,也就是互斥。由此,消费者与消费者之间的关系是属于 互斥关系

  1. 生产者与消费者之间处于:互斥且同步的关系

交易场所(超市)作为生产者与消费者之间公共场所,是需要发挥一定的作用的!下面来举些例子:

示例一:
假设某天超市中的一个产品没货了,恰好此时的消费者又需求这个商品时,如果超市没有及时补货,而且又没有对消费者进行提示的话,消费者会每时每刻都来超市寻求这个商品。消费者的每次的寻求,都会耗时、耗费精力。遇到好说的消费者还好,遇到不好说话的,可能就会引起不好的矛盾。由此,超市就应该做好对应的决策,当超市缺货时,如果有消费者来寻求商品时,应该对这个消费者进行沟通,让消费者等待,并且保留信息,待有货再来通知消费者来获取商品。
同样的,超市货架上的货物数量、货物价格等等,是生产者(供应商)需要关心的。超市需要对供应商做好对应的联系通知准备,没货了我就通知生产者上货,货满了就让供货商停止供货。也不要让生产者花费不必要的时间去往返询问等等。

示例二:
假设有生产者刚好正在为当前超市商家商品到货架上时,此时有消费者来了,寻求正在上货的商品。那么问题来了,是先上货呢?还是先让消费者拿货呢?货架属于公共资源,生产者与消费者共同访问货架,就会出现并发问题。

示例一中,生产者与消费者之间的关系,需要公共场所来维护。没货了就让消费者处于等待状态,让生产者来生产货物;货满了就要让生产者停止生产,让消费者来消费产品;由此,生产者与消费者要维护 同步的关系

示例二中,生产者与消费者之间会存在并发访问的问题,由此,是 互斥的关系

两个角色

  • 生产消费模型中存在两个角色:生产者、消费者

一个交易场所

  • 生产消费模型中的交易场所通常是:某种数据结构的缓冲区

总的来说,生产消费模型可以简单记忆概括为 “三二一原则”(这个原则主要用于方便记忆,不是官方名词)

存在三种关系、有两个角色、维护一个交易场所;手写CP问题,就可以转换为用代码去维护实现 “三二一原则”

下面来简单实现一个交易场所:

交易场所的实现(阻塞队列)

创建一个 blockQueue.hpp 文件,其内部主要实现 BlockQueue 类(阻塞队列)。如何设计这个交易产所呢?

  • 成员变量的设计

阻塞队列也是队列,成员变量中应该包含有一个队列 以及 队列容量的大小

先前提到过,消费者与生产者之间的关系是需要交易场所来维护的(消费者与生产者所处的关系有:互斥关系同步关系)。由此,BlockQueue 类中的成员需要包含:一把锁条件变量

但是,要考虑一点:当队列为满时,生产者要处于阻塞状态,停止生产;队列为空时,消费者也要处于阻塞状态,停止消费。条件变量只设置一个的话,是不能满足上面要求的。对此,我们要在类中设计两个条件变量,分别约束消费者与生产者。

先实现一部分代码,后续继续补充内容:

#pragma once#include <iostream>
#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <queue>const int gcap = 5; // 阻塞队列的最大容量template <class T>
class BlockQueue
{
public://构造	BlockQueue(const int cap = gcap): _cap(cap){// 初始化互斥锁pthread_mutex_init(&_mutex, nullptr);// 初始化消费者、生产者的条件变量pthread_cond_init(&_consumerCond, nullptr);pthread_cond_init(&_productorCond, nullptr);}//析构~BlockQueue(){// 释放锁pthread_mutex_destroy(&_mutex);// 释放条件变量pthread_cond_destroy(&_consumerCond);pthread_cond_destroy(&_productorCond);}private:std::queue<T> _q;size_t _cap;            // 队列容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _consumerCond;  // 消费者的条件变量pthread_cond_t _productorCond; // 生产者的条件变量
};

如果对互斥锁条件变量 接口不熟的老铁可以参考小编的这篇文章:多线程的互斥与同步,有较详细的介绍

  • 成员函数的设计

来分析一下,关于生产者生产的商品 以及 消费者消费的商品 在交易场所中如何处理:

生产者生产的商品,会提供给超市,摆在货架上。换回来说就是存储到阻塞队列中。由此,我们需要在 BlockQueue 类中实现一个 push 成员函数,专门用于接收生产者生产的数据。同样的,消费者需要从超市货架上获取商品。所以,我们也需要实现一个 pop 成员函数,专门用于消费者拿取数据。实现如下:

template <class T>
class BlockQueue
{
public://构造//析构bool isFull() //判断阻塞队列是否满{return _q.size() == _cap;}bool isEmpty() //判断阻塞队列是否为空{return _q.empty();}//将生产者生产的数据放入阻塞队列中void push(const T &in){// 加锁pthread_mutex_lock(&_mutex);while(isFull() == true){//阻塞队列为满时,生产者停止生产,阻塞生产者pthread_cond_wait(&_productorCond, &_mutex);}//推送数据到队列中_q.push(in);//当产品大于阻塞队列的容量一半时,不管消费者是否阻塞,都唤醒消费者消费if (_q.size() >= _cap / 2) pthread_cond_signal(&_consumerCond);// 解锁pthread_mutex_unlock(&_mutex);}// 消费产品void pop(T *out){// 加锁pthread_mutex_lock(&_mutex);while(isEmpty()){//阻塞队列为空时,消费者停止消费,阻塞消费者pthread_cond_wait(&_consumerCond, &_mutex);}//消费产品*out = _q.front();_q.pop();//到这里,队列的最后一个产品可能被消费完了,不管生产者是否阻塞都唤醒生产者,进行生产工作pthread_cond_signal(&_productorCond);// 解锁pthread_mutex_unlock(&_mutex);}
private://成员变量...
}

上面实现的阻塞队列,有两个细节需要介绍:

pthread_cond_wait 接口

在调用 pthread_cond_wait 接口时,为什么要传入 互斥锁 变量?

先来回顾一下, 调用pthread_cond_wait 接口的缘由:为了实现线程的同步关系,防止某个线程重复申请互斥锁资源,导致其他线程处于饥饿状态

就拿刚刚实现阻塞队列的 push 接口来说,生产者可不止只有一位,可能有很多个。为了防止多个线程同时访问同一块资源,导致并发问题。所以,要对当前 push 接口临界资源进行上锁保护

生产者会源源不断的生产产品,当阻塞队列满时,需要对生产者要求停止生产。使用 pthread_cond_wait 接口,将生产者阻塞等待。判断队列满不满是在临界资源中进行的;由此,线程是申请了锁资源后,再调用 pthread_cond_wait 接口。

为了避免其他线程不能使用锁资源,就得先解锁。这也是为什么在调用 pthread_cond_wait 接口时,要传入互斥锁的缘由。

pthread_cond_wait 接口内部实现:将当前线程申请的锁资源先归还(解锁),然后再将当前线程处于阻塞状态

到这里就完了吗?其实不然:

在阻塞当前线程后,OS会将当前的线程所有数据,以及当前线程执行的代码位置先保存。然后,调度其他线程执行其他任务。假设某一时刻,队列不为满,生产者又可以继续生产产品时,OS 又会将当前线程调度回来(之前怎么转移当前线程运行的资源,现在就怎么原封不动的转移回来),这个线程又可以再临界区执行后续的代码了。

那么问题来了,当前线程自己是申请锁了的。但是锁,被 pthread_cond_wait 接口内部实现的代码释放了。
怎么办呢?实现 pthread_cond_wait接口的设计者早就考虑到了。

pthread_cond_wait 内部并不是简简单单就将当前线程阻塞后就即刻返回。将当前线程处于阻塞状态后,线程被 OS 调度开。 之后当线程再被调度回来后,会继续执行pthread_cond_wait内部的后续代码

pthread_cond_wait 其内部会为当前线程申请互斥锁资源,没有申请成功会一直申请,直到申请成功,才会彻底的退出

这也说明了,为什么要给 pthread_cond_wait 接口传入 锁对象。

判断阻塞队列的空或满时,需要使用while

在判断队列是否为 / 时,为什么使用 while 循环?使用 if 可以吗?

因为消费者 和 生产者不仅只有一位,举个简单的例子:

当消费者只有一位,生产者有多个时,阻塞队列毋庸置疑很快会被生产者生产的数据占满。就拿上面的代码来说,当队列在处于满的状态下,所有的生产者都会被处于阻塞状态,只能等待消费者消费。由于消费者在获取数据时,会有对应的执行策略,唤醒生产者进行生产数据。上面在使用 pthread_cond_signal 接口还好,只是唤醒一个生产者。但是,在使用pthread_cond_broadcast 接口时就会出现问题。

前面讲到,pthread_cond_wait 内部实现有两部分执行功能:

  1. 先将当前线程锁资源归还,再将当前线程阻塞
  2. 当线程被唤醒,会为当前线程申请锁资源,申请成功继续向后执行对应的代码;否则会一直申请,直到成功

pthread_cond_broadcast 作用是唤醒所有被 pthread_cond_wait 阻塞的线程

回到刚刚的例子,消费者只有一个,在队列为满时,会去消费一个数据,阻塞队列腾就出一个空间。
如果此时消费者去调用 pthread_cond_broadcast就会出现这样的问题:所有的被阻塞的线程会被唤醒,都会去争着去申请锁资源
使用 if 条件判断进行判断的话只判断一次,并且生产者都是线程,就会出现并发的问题。第一个生产者被唤醒,在申请锁成功后,执行后续代码,生产数据。由于阻塞队列的容量只有一个,此时的生产就已经让阻塞队列满了。当第一个生产者释放锁后,后续又有其他生产者会申请锁资源,继续执行后续代码,生产对应的数据。这样直接导致队列容量的不足问题。

回看刚刚实现的代码:

//将生产者生产的数据放入阻塞队列中void push(const T &in){// 加锁pthread_mutex_lock(&_mutex);while(isFull() == true){//阻塞队列为满时,生产者停止生产,阻塞生产者pthread_cond_wait(&_productorCond, &_mutex);}//推送数据到队列中_q.push(in);//当产品大于阻塞队列的容量一半时,不管消费者是否阻塞,都唤醒消费者消费if (_q.size() >= _cap / 2) pthread_cond_signal(&_consumerCond);// 解锁pthread_mutex_unlock(&_mutex);}

使用 while 就可以很好的避免这样的问题发生。不管是唤醒单个线程,还是唤醒一批的线程。当阻塞队列的最后一个容量被占满后,其他被唤醒的线程还是会向后执行对应的代码,pthread_cond_wait会为其他生产者申请锁资源。

使用 while 进行循环判断后,不管被唤醒的线程是否申请锁资源成功,都会再次进入循环对阻塞队列的容量进行判断。容量满,再次调用pthread_cond_wait 让这个线程阻塞,归还锁资源。循环往复,直到消费者再次消费数据。从而避免阻塞队列容量问题。

测试一:单消费单生产案例

交易场所实现完成后,简单测试一下代码:创建一个生产者、一个消费者,完成对应的任务

#include "blockQueue.hpp"void* consumer(void* args)
{BlockQueue<int>* bp = static_cast<BlockQueue<int>*>(args);while(true){sleep(1); //消费者每次消费完,停顿一秒int data = 0;//将数据从blockqueue中获取 -- 获取到了数据bp->pop(&data);std::cout << "consumer get data:" << data << std::endl;}
}void* productor(void* args)
{BlockQueue<int>* bp = static_cast<BlockQueue<int>*>(args);while(true){//sleep(1); //生产者每次生产完数据,停顿一秒//生产随机数int data = rand() % 10 + 1; //将数据推送到blockqueue -- 完成生产过程bp->push(data);std::cout << "productor data:" << data << std::endl;}
}int main()
{//321原则:3种关系、2个模型、1个场所//创建阻塞队列BlockQueue<int>* bq = new BlockQueue<int>();//单消费者、单生产者模型pthread_t p,c;pthread_create(&c, nullptr, consumer,bq);pthread_create(&p, nullptr, productor,bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

先让消费者慢于生产者生产的数据,每次消费完休眠一秒。运行效果如下:
在这里插入图片描述
由于在BlockQueue类中,设置了阻塞队列的最大容量为 5 。从运行结果看,生产者一上来就产生了5个数据,之后就阻塞了;当消费者消费数据后,才会继续生产数据。

下面修改代码,让生产者每次生产完数据后,暂停一秒。再来看看运行效果:

在这里插入图片描述
由于在生产者中,加了策略,当产品数量大于阻塞队列的一半时,唤醒消费者。所以,每次生产两个产品后,就会被消费者消费完。

测试二:多生产多消费案例

接下来修改测试代码,不再以整形为例子。在阻塞队列中传入 Task 对象,模拟实现一个简单的业务流程。实现 Tack 类,其成员函数主要完成基本的 加减乘除 运算任务:

#include "blockQueue.hpp" //包含交易场所的头文件class Task
{
public:Task() {}Task(int x, int y, char op): _x(x), _y(y), _op(op), _result(0), _codeRet(0){}// 完成运算任务void operator()(){switch (_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '/':{if (_y == 0)_codeRet = -1;else_result = _x / _y;}break;case '*':_result = _x * _y;break;case '%':{if (_y == 0)_codeRet = -2;else_result = _x % _y;}break;default:std::cout << "没有这样的运算符!" << std::endl;break;}}//输出表达式字符串std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + "=";}//输出结果以及运算成功与否std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_codeRet) + ")";}~Task() {}private:int _x;int _y;char _op; // 运算符int _result;  // 运算结果int _codeRet; // 用于表示运算成功与否
};

Task 类实现后,在 main.cc 中创建多个生产者与消费者,分别生成数据与消费数据:

#include "task.hpp" //包含 Task 类头文件//将线程编号转16进制
std::string switchPthreadId(pthread_t id)
{char* str = new char[32];snprintf(str, 32, "0x%x", id);return str;
}//消费
void* consumer(void* args)
{BlockQueue<Task>* bp = static_cast<BlockQueue<Task>*>(args);while(true){Task t;// 1. 将数据从blockqueue中获取 -- 获取到了数据bp->pop(&t);// 2. 结合某种业务逻辑,处理数据!//执行运算任务t();std::cout << switchPthreadId(pthread_self()) << " | consumer data: " << t.formatArg() << t.formatRes() << std::endl;}
}//生产
void* productor(void* args)
{BlockQueue<Task>* bp = static_cast<BlockQueue<Task>*>(args); //类型转换std::string ops = "+-*/%"; while(true){sleep(1);// 1. 先通过某种渠道获取数据char op = ops[rand() % ops.size()]; //取随机运算符int x = rand() % 20 + 1;int y = rand() % 10 + 1;//获取数据Task t(x, y, op);// 2. 将数据推送到blockqueue -- 完成生产过程bp->push(t);std::cout << switchPthreadId(pthread_self()) << " | productor Task: " <<  t.formatArg() << "?" << std::endl;}
}int main()
{//生产随机数srand((uint64_t)time(nullptr) ^ getpid());//321原则:3种关系、2个角色、1个场所//创建阻塞队列BlockQueue<Task>* bq = new BlockQueue<Task>();//生产消费模型pthread_t p[2],c[3];//创建多个生产者与消费者pthread_create(p, nullptr, productor,bq);pthread_create(p + 1, nullptr, productor,bq);pthread_create(c, nullptr, consumer,bq);pthread_create(c + 1, nullptr, consumer,bq);pthread_create(c + 2, nullptr, consumer,bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}

在这里我们先让生产者生成数据慢一点,每次停顿一秒后再生成数据;让消费者消费数据快一点,编译运行结果如下:
在这里插入图片描述

生产者消费者模型的 高效性 并不能只看生产者生成数据到交易场所中,再让消费者消费简单的线性流程。

要从整体的业务流程分析,生产消费模型高效性体现如下:

  1. 生产消费模型支持多个生产者同时向队列中提交数据,满足高并发的数据生成需求;
    同时支持多个消费者同时向队列中获取数据,满足高并发,提高系统的 处理能力吞吐量
  2. 生产者只需将消息放到队列中,就可以立即进行其他的操作,无需等待消费者处理完毕;
    消费者也只需从队列中取出消息进行后续的逻辑处理,实现了生产者和消费者之间的异步操作

生成消费模型就介绍到这里,有感兴趣的老铁可以点点关注。


http://www.mrgr.cn/news/46302.html

相关文章:

  • 决策树随机森林-笔记
  • 基于Android11简单分析audio_policy_configuration.xml
  • Linux网络编程 -- 网络套接字预备与udp
  • Lombok的@Builder注解
  • ES操作指南
  • Run the FPGA VI 选项的作用
  • AI改变一切,包括你的毕业论文!如何应对?
  • 十年网络安全工程师谈学习网络安全的正确顺序
  • 希亦超声波清洗机值得购买吗?清洁技术之王多维度测评大揭秘!
  • 基于邮箱的域名欺骗攻击:利用解析器绕过访问控制
  • 面对多种可燃气体,哪种传感器最适合你的应用场景?
  • vite+vue3实现动态路径导入
  • 电力电子技术03 AC-DC整流器(2)---单相半波整流器 二极管不控整流
  • 行盒的截断样式 box-decoration-break
  • 视频汇聚平台EasyCVR支持云端录像丨监控存储丨录像回看丨录像计划丨录像配置
  • vite搭建vue3项目
  • [稳定检索|投稿优惠]2024年材料科学、能源技术与智能制造国际会议(MSETIM 2024)
  • Linux入门3——vim的简单使用
  • Zotero插件指南:20个工具让你的学术生活更简单
  • 基于jmeter+perfmon的稳定性测试记录