当前位置: 首页 > news >正文

多线程案例---阻塞队列

1. 阻塞队列

阻塞队列是一种特殊的队列,也遵守 " 先进先出 " 的原则。

阻塞队列是一种线程安全的数据结构,并且具有以下特性:

1. 当队列为满时,继续进行入队列操作就会阻塞,直到有其他线程从队列中取走元素

2. 当队列为空时,继续进行出队列操作就会阻塞,直到有其他线程往队列中插入元素

阻塞队列的一个典型应用场景就是”生产者消费者模型“ 。

1.1 生产者消费者模型 

在多线程编程中,生产者消费者模型是一种典型的编码技巧,生产者消费者模型通过阻塞队列来解决生产者和消费者强耦合的问题。

在生产者消费者模型中,生产者不与消费者直接进行通讯,而是通过阻塞队列来间接的进行通讯。当生产者产出一个数据后,它不会等待消费者来处理,而是将产出的数据扔到阻塞队列中,等到消费者去处理数据时,消费者也不会直接跟生产者去要数据,而是从阻塞队列中获取需要处理的数据。

如下图

在生产者消费者模型中引入阻塞队列有什么影响呢?

1 .解耦合

      我们假设有两个服务器,分别为A服务器和B服务器,当A服务器和B服务器直接进行交流时,编写A服务器的代码时,多多少少也会涉及到一点B服务器的逻辑,编写B服务器的代码时多多少少也会涉及到一些A服务器的逻辑,这样就导致了两个服务器的代码就有了强耦合性。而当我们引入阻塞队列之后,就会变成A服务器与阻塞队列进行交涉,B服务器和阻塞队列进行交涉,从而间接完成A服务器和B服务器的间接交涉,这样A服务器和B服务器就不会直接进行交涉了。此时,A服务器中的代码中就看不到B服务器了,B服务器中的代码中就看不到A服务器了。这样就降低了代码大的耦合性。

2.削峰填谷

    服务器接收的数据量可以理解为一个波形图,当A服务器短时间内接收的数据量达到一个峰值时,很容易将A服务器搞挂,此时如果将这些大量的数据直接交给服务器来处理,B服务器也很可能挂掉。但是,如果有了一个阻塞队列,当数据的发送达到一个峰值时,A服务器可以将数据放到阻塞队列中,防止A服务器挂掉。当数据大的发送量达到一个波谷时,B服务器就可以利用数据的传送处于波谷的时间去处理阻塞队列中的数据,从而防止B服务器因为一股脑处理大量数据而挂掉。

1.2 标准库中的阻塞队列 

 在Java标准库中内置了一个阻塞队列,如果我们需要在一些程序中使用阻塞队列,使用标准库提供的即可

1. Java标准库提供的标准库是一个名为BlockingQueue的接口,真正实现的类是LinkedBlockingQueue 

2. put方法用于阻塞式的入队列,take方法用于阻塞式的出队列

3. BlockingQueue也有offer,poll,peek等方法,但是这些方法没有阻塞性

1.3 用标准库的阻塞队列实现一个生产者消费者模型

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo26 {public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> blockingQueue=new LinkedBlockingQueue<>(100);Thread producer=new Thread(()->{try{int id=0;for(int i=0;i<100;i++){Thread.sleep(1000);blockingQueue.put(id);System.out.println("producer生产数据:"+id);id++;}}catch (InterruptedException e){e.printStackTrace();}},"producer");Thread consumer=new Thread(()->{try{for (int i = 0; i < 100; i++) {Thread.sleep(1000);int take=blockingQueue.take();System.out.println("consumer消费数据:"+take);}}catch (InterruptedException e){e.printStackTrace();}},"consumer");producer.start();consumer.start();producer.join();consumer.join();}
}

 

2.模拟实现一个阻塞队列

实现思路:

1.由于阻塞队列要不断有数据入队列和出队列,我们呢可以使用循环队列来实现

2.组要模拟实现put方法和take方法

3.在put过程中,发现队列满了或者在take过程中,发现队列为空,我们就要wait

4.注意wait要搭配一个循环来使用,因为wait也有可能被Interrupt这样的方法唤醒

5.使用syncronized进行加锁控制

代码实现:

class MySubBLockingQueue{private volatile int[] data=null;private volatile int head=0;private volatile int tail=0;private volatile int size=0;public MySubBLockingQueue(int capacity){data=new int[capacity];}public void put(int number) throws InterruptedException {synchronized (this){//判断队列是否满了while (size==data.length){this.wait();}data[tail]=number;tail++;size++;if(tail>=data.length){tail=0;}this.notify();//tail=(tail+1)%data.length;}}public int take() throws InterruptedException {int back=0;synchronized (this){//判断队列是否为空while (size==0){this.wait();}back=data[head];head++;size--;if(head>= data.length){head=0;}//head=(head+1)%data.length;this.notify();}return back;}
}

 

3.用模拟实现的阻塞队列实现生产者消费者模型

class MySubBLockingQueue{private volatile int[] data=null;private volatile int head=0;private volatile int tail=0;private volatile int size=0;public MySubBLockingQueue(int capacity){data=new int[capacity];}public void put(int number) throws InterruptedException {synchronized (this){//判断队列是否满了while (size==data.length){this.wait();}data[tail]=number;tail++;size++;if(tail>=data.length){tail=0;}this.notify();//tail=(tail+1)%data.length;}}public int take() throws InterruptedException {int back=0;synchronized (this){//判断队列是否为空while (size==0){this.wait();}back=data[head];head++;size--;if(head>= data.length){head=0;}//head=(head+1)%data.length;this.notify();}return back;}
}
public class Demo27 {public static void main(String[] args) throws InterruptedException {MySubBLockingQueue bLockingQueue=new MySubBLockingQueue(100);Thread producer=new Thread(()->{try{int id=0;for (int i = 0; i < 100; i++) {Thread.sleep(1000);bLockingQueue.put(id);System.out.println("producer生产数据:"+id);id++;}}catch (InterruptedException e){e.printStackTrace();}},"producer");Thread consumer=new Thread(()->{try{for (int i = 0; i < 100; i++) {Thread.sleep(1000);int take=bLockingQueue.take();System.out.println("consumer消费数据:"+take);}}catch (InterruptedException e){e.printStackTrace();}},"consumer");producer.start();consumer.start();producer.join();consumer.join();}
}

http://www.mrgr.cn/news/68765.html

相关文章:

  • Leetcode刷题Python之3235.判断矩形的两个角落是否可达
  • 联网环境kubeadm安装k8s
  • 检索增强和知识冲突学习笔记
  • 安装docker-compose
  • C++中sizeof运算符的案例分析
  • K8s使用nfs
  • 国内 ChatGPT中文版镜像网站整理合集(2024/11/08)
  • idea 基础简单应用(java)
  • Android Glide动态apply centerCropTransform(),transition withCrossFade动画,Kotlin
  • ubuntu中apt-get的默认安装路径。安装、卸载以及查看的方法总结
  • 【linux学习指南】磁盘分区挂载到目录,形成文件系统挂载点
  • 基于地铁刷卡数据分析与可视化——以杭州市为例(二)
  • 2.索引:深入解析 B+ 树:原理、MySQL 应用及与其他数据结构的对比
  • 在实际的网络通信中,客户端发起请求的常见流程
  • Java多线程(锁的操作)
  • IO作业day4
  • 发布一个npm组件库包
  • 哈哈,这可是“加长版”吐槽,我先声明,绝对有趣但绝对善意的深度吐槽!你要是真的看完
  • 算法训练(leetcode)二刷第二十天 | 93. 复原 IP 地址、78. 子集、90. 子集 II
  • 标准遗传算法-c++源程序
  • 从0开始学习机器学习--Day19--学习曲线
  • Moment.js、Day.js、Miment,日期时间库怎么选?
  • leetcode hot100【LeetCode 17.电话号码的字母组合】java实现
  • 快速开发工具 Vite
  • 大模型微调技术 --> IA3
  • LeetCode 每日一题 长度为 K 的子数组的能量值