当前位置: 首页 > news >正文

Linux 生产者消费者模型

目录

生产者消费者模型

生产者消费者模型的概念

生产者消费者模型的特点

生产者消费者模型优点

基于BlockingQueue的生产者消费者模型

BlockingQueue

模拟实现基于阻塞队列的生产消费模型

POSIX 信号量

信号量的原理

信号量的概念

信号量函数

二元信号量模拟实现互斥功能

基于环形队列的生产者消费者模型

空间资源和数据资源

生产者和消费者申请和释放资源

必须遵守的两个规则

代码实现

信号量保护环形队列的原理


生产者消费者模型

生产者消费者模型的概念

生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦的。

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

三种关系(并发问题): 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
两种角色: 生产者和消费者。(通常由进程或线程承担)。
一个交易场所: 通常指特定结构的内存空间。

我们编写代码时,需要注意三种关系引发的并发问题

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

生产者和生产者:当容器装满时,可能会触发并发问题,所以需要加锁

消费者和消费者:当容器中的数据只有一个时,并发问题会很明显,所以需要加锁

生产者和消费者:消费者向容器拿数据时,可能容器中有数据也可能没有数据,全看生产者有没有生产,给消费行为增加了不确定性,所以生产者消费者需要加锁

生产者和消费者之间为什么会存在同步关系?

当生产者将容器装满之后,生产者继续生产,则生产失败

当消费者将容器中的数据消费完之后,消费者继续消费,则消费失败

不管是哪一方继续生产或继续消费,都会导致另一方长时间得不到容器资源而引发饥饿问题,是非常低效的

所以我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

生产者消费者模型优点

  • 解耦 (生产者生产时不需要等待消费者消费,消费者消费时也不需要等等待生产者生产)
  • 高效 (高效并不是出现在临界区,而是非临界区生产者生产数据之前需要获取数据,而消费者得到数据之后需要对数据进行加工) 
  • 支持忙闲不均 (由于有容器的原因,生产者生产数据,支持消费者不消费,消费者生产数据,支持生产者不生产数据)

多生产多消费的高效的意义在于:非临界区

多生产者:一个生产者在申请锁向仓库生产资源,其他生产者并发的去获取数据

多消费者:一个消费者在申请锁去仓库消费资源,其他消费者并发的去加工处理数据

消费者生产者并发执行:生产者在临界区生产数据时,消费者可能在并发的加工处理数据,消费者在临界区消费数据时,生产者可能在并发的在获取数据,生产者消费者也可能都没有访问临界区,生产者获取数据,消费者加工处理数据,非临界区代码并发执行

基于BlockingQueue的生产者消费者模型

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程 操作时会被阻塞)

注意:为了方便理解,下面我们以单生产者、单消费者为例进行实现。

其与普通的队列的区别在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。

模拟实现基于阻塞队列的生产消费模型

其中的BlockQueue就是生产者消费者模型当中的交易场所

BlockQueue.h

#ifndef __BLOCK_QUEUE__
#define __BLOCK_QUEUE__#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <queue>#define DefaultMaxima 5template<class T>
class BlockQueue
{
public:BlockQueue(int maxima = DefaultMaxima): _maxima(maxima){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);low = maxima / 3;high = (maxima * 2) / 3;}T pop(){pthread_mutex_lock(&_lock);//当内存数据结构中没有数据时,就去消费者条件变量中等待while(_q.size() == 0){pthread_cond_wait(&_c_cond, &_lock);}T front = _q.front();_q.pop();//优化if(high > _q.size()) pthread_cond_signal(&_c_cond);//执行以上语句,一定会消费到生产资源,这时我们就唤醒生产者pthread_mutex_unlock(&_lock);//pthread_cond_boradcast(&_c_cond);return front;}void push(const T& data){pthread_mutex_lock(&_lock);//生产者生产到一定数量,就去生产者条件变量中等待while(_q.size() == _maxima){pthread_cond_wait(&_p_cond, &_lock);}_q.push(data);//优化if(low < _q.size()) pthread_cond_signal(&_p_cond);//当生产者执行完以上生产语句之后,特定的内存结构中一定有数据//这时我们就唤醒消费者//pthread_cond_boradcast(&_c_cond);pthread_mutex_unlock(&_lock);}~BlockQueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}
private:std::queue<T> _q;  //队列int _maxima; //生产者最大生产个数pthread_mutex_t _lock;pthread_cond_t _c_cond; pthread_cond_t _p_cond; //优化int low;int high;
};#endif
  • 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
  • 在这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,要进行生产的生产者线程就应该在_c_cond条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在_p_cond条件变量下进行等待。
  • 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。
  • 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在_p_cond条件变量下进行等待,因此当生产者生产完数据后需要唤醒在_p_cond条件变量下等待的消费者线程。
  • 同样的,当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在_c_cond条件变量下进行等待,因此当消费者消费完数据后需要唤醒在_c_cond条件变量下等待的生产者线程。

判断是否满足生产消费条件时不能用if,而应该用while:

防止伪唤醒:

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  • 其次,在多消费者的情况下,当生产者将队列装载满了之后,消费者如果使用pthread_cond_broadcast函数唤醒生产者,就会一次性唤醒多个生产者,多个生产者线程竞争锁资源,生产者1申请到锁资源之后继续执行后续的代码,执行完之后,由于我们唤醒了多个线程,并且它们都不在等待队列中,于是它们继续竞争锁资源,进行生产,但是这时队列已经满了,没有空间让你去生产了,这种情况就被称为:线程被伪唤醒
  • 为了避免出现上述情况,我们就要让线程被唤醒后不要立马执行后续代码,而是再次进行判断,确认是否真的满足生产消费条件,所以这里必须要用while进行判断。

在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。

Task.hpp

#pragma once
#include "BlockQueue.h"enum
{EXITCODE = 0,DIVZERO,MODZERO
};class Task
{
public:Task(int x, int y, char oper, int exitcode_ = EXITCODE) : _data1(x), _data2(y), _oper(oper), exitcode(exitcode_){}void run(){switch (_oper){case '+':result = _data1 + _data2;break;case '-':result = _data1 - _data2;break;case '*':result = _data1 * _data2;break;case '/':if(_data1 == 0 | _data2 == 0){exitcode = DIVZERO;}else{result = _data1 / _data2;}break;case '%':if(_data1 == 0 | _data2 == 0){exitcode = MODZERO;}else{result = _data1 % _data2;}break;default:std::cout << "Symbol mismatch!" << std::endl;break;}std::cout << _To_String() << " = " << result << std::endl;// std::cout << "a + b =  " << _a + _b << std::endl;}std::string _To_String(){std::string str;str += "[exitcode: ";str += std::to_string(exitcode);str += "]";str += " ";str += std::to_string(_data1);str += " ";str += _oper;str += " ";str += std::to_string(_data2);return str;}void GetTask(){std::cout << _data1 << " " << _oper << " " << _data2 << " = ?" << std::endl;}void operator()(){run();}~Task(){}private:int _data1;int _data2;char _oper;int exitcode;int result;
};
#include "BlockQueue.h"
#include <ctime>
#include "Task.hpp"std::string symbol = "+-*/%";//消费者
void *Consumer(void *args)
{BlockQueue<Task> *cp = static_cast<BlockQueue<Task>*>(args);while(true){sleep(1);//消费数据Task t = cp->pop();t();}
} //生产者
void *Productor(void *args)
{BlockQueue<Task> *cp = static_cast<BlockQueue<Task>*>(args);int a, b;int len = symbol.size();while(true){sleep(1);//生产数据a = rand() % 10 + 1;b = rand() % 10;Task t(a, b, symbol[rand() % len]);cp->push(t);t.GetTask();}
}int main()
{//种下一颗随机数种子srand((unsigned)time(nullptr) ^ getpid());pthread_t C, P;//还可以跑对象和任务BlockQueue<Task> *bq = new BlockQueue<Task>;pthread_create(&C, nullptr, Consumer, bq);pthread_create(&P, nullptr, Productor, bq);pthread_join(C, nullptr);pthread_join(P, nullptr);delete bq;return 0;
}

我们让生产者去生产任务,然后让消费者对这些任务进行加工处理

也就是说,我们想让生产者消费者模型处理某一种任务时,就只需要提供对应的Task类,然后让该Task类提供一个对应的run成员函数告诉我们应该如何处理这个任务即可。

POSIX 信号量

信号量的原理

  • 我们将可能会被多个执行流同时访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
  • 当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
  • 但实际我们可以将这块临界资源再分割为多个区域(资源分块),当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。

信号量的概念

信号量(信号灯)本质是一个计数器,描述临界资源中资源数目的多少,信号量能够更细粒度的对临界资源进行管理。

每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特定的临界资源的权限,当操作完毕后就应该释放信号量。

信号量的PV操作:

P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。

V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。

PV操作必须是原子操作

多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
但信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
注意: 内存当中变量的++、--操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行++、--操作。

申请信号量失败被挂起等待

当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。

注意: 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。

信号量函数

初始化信号量

初始化信号量的函数原型如下:

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数说明:

sem:需要初始化的信号量。

pshared:传入0值表示线程间共享,传入非零值表示进程间共享。

value:信号量的初始值(计数器的初始值)。

返回值说明:初始化信号量成功返回0,失败返回-1。

注意: POSIX信号量和System V信号量(进程)作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。

销毁信号量

销毁信号量sem_destroy函数原型如下:

int sem_destroy(sem_t *sem);

参数说明:

  • sem:需要销毁的信号量。

返回值说明:

  • 销毁信号量成功返回0,失败返回-1。

等待信号量

等待信号量sem_wait函数原型如下:

int sem_wait(sem_t *sem); //P() 

参数说明:

  • sem:需要等待的信号量。

返回值说明:

  • 等待信号量成功返回0,信号量的值减一。
  • 等待信号量失败返回-1,信号量的值保持不变。

发布信号量

发布信号量sem_post函数原型如下:

int sem_post(sem_t *sem);//V() 

参数说明:

  • sem:需要发布的信号量。

返回值说明:

  • 发布信号量成功返回0,信号量的值加一。
  • 发布信号量失败返回-1,信号量的值保持不变。

二元信号量模拟实现互斥功能

信号量本质是一个计数器,如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量。

信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。

例如,下面我们实现一个多线程抢票系统,其中我们用二元信号量模拟实现多线程互斥。我们在主线程当中创建五个新线程,让这五个新线程执行抢票逻辑,并且每次抢完票后打印输出此时剩余的票数,其中我们用全局变量tickets记录当前剩余的票数,此时tickets是会被多个执行流同时访问的临界资源,在下面的代码中我们并没有对tickets进行任何保护操作。

#include <iostream>
#include <pthread.h>
#include <unistd.h>using namespace std;int tickets = 1000;void *Routine(void *args)
{while(true){// usleep(1000);if(tickets > 0){usleep(1000); //等待线程时间片到期,加大线程切换概率cout << "get a tickets: " << tickets << endl;tickets--;}  else{break;}}pthread_exit((void*)6666);
}int main()
{pthread_t tid1, tid2, tid3, tid4, tid5;pthread_create(&tid1, nullptr, Routine, nullptr);pthread_create(&tid2, nullptr, Routine, nullptr);pthread_create(&tid3, nullptr, Routine, nullptr);pthread_create(&tid4, nullptr, Routine, nullptr);pthread_create(&tid5, nullptr, Routine, nullptr);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);pthread_join(tid5, nullptr);return 0;
}

运行代码后可以看到,线程打印输出剩余票数时出现了票数剩余为负数的情况,这是不符合我们预期的。

下面我们在抢票逻辑当中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>using namespace std;int tickets = 1000;sem_t sem;void *Routine(void *args)
{while(true){// usleep(1000);sem_wait(&sem);if(tickets > 0){usleep(1000); //等待线程时间片到期,加大线程切换概率cout << "get a tickets: " << tickets << endl;tickets--;sem_post(&sem); }  else{sem_post(&sem);break;}}pthread_exit((void*)6666);
}int main()
{sem_init(&sem, 0, 1);pthread_t tid1, tid2, tid3, tid4, tid5;pthread_create(&tid1, nullptr, Routine, nullptr);pthread_create(&tid2, nullptr, Routine, nullptr);pthread_create(&tid3, nullptr, Routine, nullptr);pthread_create(&tid4, nullptr, Routine, nullptr);pthread_create(&tid5, nullptr, Routine, nullptr);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);pthread_join(tid5, nullptr);sem_destroy(&sem);return 0;
}

基于环形队列的生产者消费者模型

空间资源和数据资源

生产者关注的是空间资源,消费者关注的是数据资源

对于生产者和消费者来说,它们关注的资源是不同的:

  • 生产者关注的是环形队列当中是否有空间(space),只要有空间生产者就可以进行生产。
  • 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。

p_space和c_data的初始值设置

现在我们用信号量来描述环形队列当中的空间资源(SpaceSem)和数据资源(DataSem),在我们初始信号量时给它们设置的初始值是不同的:

  • SpaceSem的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
  • DataSem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。

生产者和消费者申请和释放资源

生产者申请空间资源,释放数据资源

对于生产者来说,生产者每次生产数据前都需要先申请SpaceSem:

  • 如果SpaceSem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
  • 如果SpaceSem的值为0,则信号量申请失败,此时生产者需要在SpaceSem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒。

当生产者生产完数据后,应该释放DataSem:

虽然生产者在进行生产前是对SpaceSem进行的P操作,但是当生产者生产完数据,应该对DataSem进行V操作而不是SpaceSem。
进行生产操作时,SpaceSem资源少一个(P操作),DataSem资源多一个(V操作)

消费者申请数据资源,释放空间资源

对于消费者来说,消费者每次消费数据前都需要先申请DataSem:

  • 如果DataSem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
  • 如果DataSem的值为0,则信号量申请失败,此时消费者需要在DataSem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。

当消费者消费完数据后,应该释放SpaceSem:

虽然消费者在进行消费前是对DataSem进行的P操作,但是当消费者消费完数据,应该对SpaceSem进行V操作而不是DataSem。
进行消费操作时,DataSem资源少一个(P操作),SpaceSem资源多一个(V操作)

必须遵守的两个规则

在基于环形队列的生产者和消费者模型当中,生产者和消费者必须遵守如下二个规则。

第一个规则:生产者和消费者不能对同一个位置进行访问。

生产者和消费者在访问环形队列时:

  • 生产者和消费者在访问环形队列时:如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
  • 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产行为和消费行为是可以并发进行的,此时不会出现数据不一致等问题

第二个规则:无论是生产者还是消费者,都不应该将对方套一个圈以上。

生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。

同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。

代码实现

#ifndef __RingQueue_H__
#define __RingQueue_H__
#include <pthread.h>
#include <string>
#include <semaphore.h>
#include <vector>static const int defaultcap = 5;
//多生产多消费//使用信号量
template<class T>
class RingQueue
{//申请信号量void P(sem_t &sem){sem_wait(&sem);}//释放信号量void V(sem_t &sem){sem_post(&sem);}void lock(pthread_mutex_t &lock){pthread_mutex_lock(&lock);}void unlock(pthread_mutex_t &lock){pthread_mutex_unlock(&lock);}
public:RingQueue(int cap = defaultcap):_cap(cap), _RQ(_cap), _pse(0), _cda(0){sem_init(&_pSpace, 0, _cap);sem_init(&_cData, 0, 0);pthread_mutex_init(&_clock, nullptr);pthread_mutex_init(&_plock, nullptr);}//生产void push(T& in){P(_pSpace);lock(_plock);_RQ[_pse] = in;_pse++;//一直循环_pse %= _cap;unlock(_plock);V(_cData);}//消费void pop(T *out){P(_cData);lock(_clock);*out = _RQ[_cda];_cda++;//一直循环_cda %= _cap;unlock(_clock);V(_pSpace);}~RingQueue(){sem_destroy(&_pSpace);sem_destroy(&_cData);pthread_mutex_destroy(&_clock);pthread_mutex_destroy(&_plock);}private:int _cap;std::vector<T> _RQ;int _pse; //生产者下标int _cda; //消费者下标sem_t _pSpace; //生产者空间资源sem_t _cData; //消费者数据资源pthread_mutex_t _plock;pthread_mutex_t _clock;
};#endif  

为了方便理解,我们这里实现单生产者、单消费者的生产者消费者模型。于是在主函数我们就只需要创建一个生产者线程和一个消费者线程,生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费。

Task.hpp

#pragma once
#include <iostream>
#include <string>enum
{EXITCODE = 0,DIVZERO,MODZERO
};class Task
{
public:Task(){}Task(int x, int y, char oper, int exitcode_ = EXITCODE) : _data1(x), _data2(y), _oper(oper), exitcode(exitcode_){}void run(){switch (_oper){case '+':result = _data1 + _data2;break;case '-':result = _data1 - _data2;break;case '*':result = _data1 * _data2;break;case '/':if(_data1 == 0 | _data2 == 0){exitcode = DIVZERO;}else{result = _data1 / _data2;}break;case '%':if(_data1 == 0 | _data2 == 0){exitcode = MODZERO;}else{result = _data1 % _data2;}break;default:std::cout << "Symbol mismatch!" << std::endl;break;}std::cout << _To_String() << " = " << result << std::endl;}std::string _To_String(){std::string str;str += "[exitcode: ";str += std::to_string(exitcode);str += "]";str += " ";str += std::to_string(_data1);str += " ";str += _oper;str += " ";str += std::to_string(_data2);return str;}void GetTask(){std::cout << _data1 << " " << _oper << " " << _data2 << " = ?" << std::endl;}void operator()(){run();}~Task(){}private:int _data1;int _data2;char _oper;int exitcode;int result;
};
#include "Task.hpp"
#include "RingQueue.h"
#include <unistd.h>std::string oper = "+-*/%";  void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task>*>(args);while(true){Task t;rq->pop(&t);t();}}void *Productor(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task>*>(args);int len = oper.size();while(true){sleep(1);int _data1 = rand() % 10;int _data2 = rand() % 10 + 1;char ch = oper[rand() % oper.size()];Task t(_data1, _data2, ch);rq->push(t);t.GetTask();}}int main()
{//种下一颗随机数种子srand(time(nullptr) ^ getpid());pthread_t c, p;RingQueue<Task>* rq = new RingQueue<Task>(10);pthread_create(&p, nullptr, Productor, rq);pthread_create(&c, nullptr, Consumer, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}

允许结果:

信号量保护环形队列的原理

在blank_sem和data_sem两个信号量的保护后,该环形队列中不可能会出现数据不一致的问题。

因为只有当生产者和消费者指向同一个位置并访问时,才会导致数据不一致的问题,而此时生产者和消费者在对环形队列进行写入或读取数据时,只有两种情况会指向同一个位置:

  • 环形队列为空时。
  • 环形队列为满时。


但是在这两种情况下,生产者和消费者不会同时对环形队列进行访问:

  • 当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0。
  • 当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0。

也就是说,当环形队列为空和满时,我们已经通过信号量保证了生产者和消费者的串行化过程。而除了这两种情况之外,生产者和消费者指向的都不是同一个位置,因此该环形队列当中不可能会出现数据不一致的问题。并且大部分情况下生产者和消费者指向并不是同一个位置,因此大部分情况下该环形队列可以让生产者和消费者并发的执行


http://www.mrgr.cn/news/99108.html

相关文章:

  • (done) 吴恩达版提示词工程 1. 引言 (Base LLM 和 Instruction Tuned LLM)
  • C++:详解命名空间
  • 【Rust 精进之路之第14篇-结构体 Struct】定义、实例化与方法:封装数据与行为
  • 【TeamFlow】4 团队人员管理系统的实现
  • 【Rust 精进之路之第6篇-流程之舞】控制流:`if/else`, `loop`, `while`, `for` 与模式匹配初窥
  • 【Rust 精进之路之第15篇-枚举 Enum】定义、变体与数据关联:表达多种可能性
  • 【Rust 精进之路之第4篇-数据基石·上】标量类型:整数、浮点数、布尔与字符的精妙之处
  • 【Rust 精进之路之第10篇-借用·规则】引用 (``, `mut`):安全、高效地访问数据
  • 【Rust 精进之路之第2篇-初体验】安装、配置与 Hello Cargo:踏出 Rust 开发第一步
  • uniapp-商城-29-vuex 关于系统状态的管理
  • VSCode 扩展离线下载方法
  • 【Python图像处理入门】Python读取图像的5种方式指南(从入门到入土)
  • 【更新完毕】2025泰迪杯数据挖掘竞赛A题数学建模思路代码文章教学:竞赛论文初步筛选系统
  • uniapp-商城-27-vuex 使用流程
  • 6.QT-常用控件-QWidget|windowTitle|windowIcon|qrc机制|windowOpacity|cursor(C++)
  • C++ AVL树
  • MySQL+Redis实战教程:从Docker安装部署到自动化备份与数据恢复20250418
  • Linux笔记---动静态库(原理篇)
  • QML Label 组件
  • QT6(24)4.1界面组件概述:基础类QWidget 的属性 sizePolicy(组件默认的布局属性)。4.2布局管理:把容器组件与布局组件结合在一起使用,举例设置组件伸缩因子 stretch