当前位置: 首页 > news >正文

【阻塞队列】- 生产者和消费者模式

文章目录

  • 1. 前言
  • 2. 生产者和消费者
  • 3. 实现方式
  • 4. wait / notifyAll
  • 5. await / signalAll
  • 6. acquire / release
  • 7. 阻塞队列
  • 8. 小结


1. 前言

前面定时任务的文章已经基本到尾声,从这篇文章开始,就要解析 JDK 里面的阻塞队列了。同样的,不当当是 JDK 里面的阻塞队列,Netty 中的 Mpsc 几种模型的队列,我都会去尝试解析,尽可能做到一点一点剖析源码。

阻塞队列其实也是属于高并发组件的一种,那说到阻塞队列,这就不得不说下生产者和消费者模式了

2. 生产者和消费者

那么首先第一个问题:什么是生产者消费者模式?

概念:生产者-消费者模式是一种经典的设计模式,广泛应用于并发编程和多线程环境中。它主要用于解决生产者线程和消费者线程之间的协作问题。生产者负责生成数据或任务,消费者负责处理这些数据或任务。

  • 生产者(Producer): 生产者就是负责生成数据或者任务的线程(或者进程),生产出来的数据会放到共享缓冲区或者阻塞队列中,等待消费者消费。
  • 消费者(Consumer): 消费者负责消费生产者生产出来的数据,就是从缓冲区或者阻塞队列中去获取。
  • 缓冲区(Buffer)或队列(Queue): 生产者和消费者共享的资源,用于存储生产者生成的数据或任务,直到消费者处理它们。

第二个问题:生产者-消费者模式的核心问题是如何确保生产者和消费者之间的协作是正确和高效的?

  • 缓冲区/队列满: 当缓冲区/队列满了的时侯,生产者需要等待,直到消费者取出数据。
  • 缓冲区/队列空: 当缓冲区/队列空了的时候,消费者需要等待,直到生产者放入数据。
  • 确保线程安全: 多个生产者和多个消费者同时访问缓冲区时,需要确保线程安全,避免数据竞争和冲突。

3. 实现方式

生产者-消费者模式 的实现方式有下面几种:

  1. wait / notifyAll: 通过 Object 的 wait / nofityAll 来进行线程间的通信,从而实现生产者和消费者的通信
  2. await / signalAll: 通过 ReentrantLock 和 Condition 来实现进程间的通信,通过 Condition 的 await 和 signalAll 方法来实现生产者和消费者的通信
  3. acquire / release: 通过 Semaphore 信号量实现进程间的通信,也就是通过 acquire / release 方法。
  4. 阻塞队列: 通过阻塞队列来管理生产者生产出来的 产品,其实阻塞队列更像是对前面的方法的包装,只不过我们用了阻塞队列就不用关注如何进行线程通信了。

下面就来看下这几种方法是如何实现生产者和消费者的。


4. wait / notifyAll

按照上面的逻辑,首先我们需要一个缓冲区,这个缓冲区是给所有生产者和消费者操作的,这里我们用队列来实现:

class ShareBuffer {private final int capacity;private final Queue<String> queue;public ShareBuffer(int capacity) {this.capacity = capacity;this.queue = new LinkedList<>();}public synchronized void produce(String value) throws InterruptedException {while(queue.size() == capacity){// 阻塞等待wait();}System.out.println("生产者: " + Thread.currentThread().getName() + "生产了一个数字:" + value);queue.add(value);// 唤醒消费者线程去竞争notifyAll();}public synchronized String consume() throws InterruptedException {while(queue.size() == 0){// 阻塞等待wait();}String element = queue.poll();System.out.println("消费者: " + Thread.currentThread().getName() + "消费了:" + element);// 唤醒生产者线程去添加notifyAll();return element;}
}

其实里面的逻辑很简单,就是两个方法:

  • public synchronized void produce(String value): 生产者生产的方法
  • public synchronized String consume():消费者的消费方法

这两个方法是同步方法,也就是线程安全的。然后在这两个方法里面去判断容量从而判断当前线程是否应该阻塞,里面的 while 可以换成 if,因为是线程安全的。

然后我们需要提供生产者和消费者类,来处理这个队列,里面的逻辑很简单,我就不多解释了。

class Consumer implements Runnable{private ShareBuffer shareBuffer;public Consumer(ShareBuffer shareBuffer) {this.shareBuffer = shareBuffer;}@Overridepublic void run() {while(true){try {String element = shareBuffer.consume();System.out.println("消费者: " + Thread.currentThread().getName() + "消费了:" + element);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}class Producer implements Runnable{private ShareBuffer shareBuffer;private String prefix;private int start;public Producer(ShareBuffer shareBuffer, String prefix, int start) {this.shareBuffer = shareBuffer;this.prefix = prefix;this.start = start;}@Overridepublic void run() {for(int i = start * 5; i < start * 5 + 5; i++){try {shareBuffer.produce(prefix + ":" + i);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}

最后我们需要一个 Main 方法,来创建生产者和消费者去对缓冲区操作。

public class Main {public static void main(String[] args) throws InterruptedException {ShareBuffer shareBuffer = new ShareBuffer(10);for(int i = 0; i < 5; i++){Producer producer = new Producer(shareBuffer, "producer-" + i, i);Consumer consumer = new Consumer(shareBuffer);new Thread(producer, "producer-name-" + i).start();new Thread(consumer, "consumer-name-" + i).start();}}
}

最后我们来看下输出:

生产者: producer-name-0生产了一个数字:producer-0:0
生产者: producer-name-0生产了一个数字:producer-0:1
生产者: producer-name-0生产了一个数字:producer-0:2
生产者: producer-name-0生产了一个数字:producer-0:3
生产者: producer-name-0生产了一个数字:producer-0:4
消费者: consumer-name-0消费了:producer-0:0
消费者: consumer-name-0消费了:producer-0:0
消费者: consumer-name-0消费了:producer-0:1
消费者: consumer-name-0消费了:producer-0:1
消费者: consumer-name-0消费了:producer-0:2
消费者: consumer-name-0消费了:producer-0:2
消费者: consumer-name-0消费了:producer-0:3
消费者: consumer-name-0消费了:producer-0:3
生产者: producer-name-2生产了一个数字:producer-2:10
生产者: producer-name-2生产了一个数字:producer-2:11
生产者: producer-name-4生产了一个数字:producer-4:20
消费者: consumer-name-0消费了:producer-0:4
消费者: consumer-name-0消费了:producer-0:4
消费者: consumer-name-0消费了:producer-2:10
消费者: consumer-name-0消费了:producer-2:10
消费者: consumer-name-0消费了:producer-2:11
消费者: consumer-name-0消费了:producer-2:11
消费者: consumer-name-0消费了:producer-4:20
消费者: consumer-name-0消费了:producer-4:20
生产者: producer-name-4生产了一个数字:producer-4:21
生产者: producer-name-4生产了一个数字:producer-4:22
生产者: producer-name-3生产了一个数字:producer-3:15
生产者: producer-name-3生产了一个数字:producer-3:16
生产者: producer-name-3生产了一个数字:producer-3:17
生产者: producer-name-3生产了一个数字:producer-3:18
生产者: producer-name-3生产了一个数字:producer-3:19
消费者: consumer-name-2消费了:producer-4:21
消费者: consumer-name-2消费了:producer-4:21
消费者: consumer-name-2消费了:producer-4:22
消费者: consumer-name-2消费了:producer-4:22
生产者: producer-name-2生产了一个数字:producer-2:12
消费者: consumer-name-2消费了:producer-3:15
消费者: consumer-name-2消费了:producer-3:15
消费者: consumer-name-3消费了:producer-3:16
消费者: consumer-name-3消费了:producer-3:16
消费者: consumer-name-3消费了:producer-3:17
消费者: consumer-name-3消费了:producer-3:17
消费者: consumer-name-3消费了:producer-3:18
消费者: consumer-name-3消费了:producer-3:18
消费者: consumer-name-3消费了:producer-3:19
消费者: consumer-name-3消费了:producer-3:19
生产者: producer-name-4生产了一个数字:producer-4:23
生产者: producer-name-4生产了一个数字:producer-4:24
消费者: consumer-name-4消费了:producer-2:12
消费者: consumer-name-4消费了:producer-2:12
消费者: consumer-name-4消费了:producer-4:23
消费者: consumer-name-4消费了:producer-4:23
消费者: consumer-name-4消费了:producer-4:24
消费者: consumer-name-4消费了:producer-4:24
生产者: producer-name-1生产了一个数字:producer-1:5
消费者: consumer-name-1消费了:producer-1:5
消费者: consumer-name-1消费了:producer-1:5
生产者: producer-name-2生产了一个数字:producer-2:13
生产者: producer-name-2生产了一个数字:producer-2:14
消费者: consumer-name-2消费了:producer-2:13
消费者: consumer-name-2消费了:producer-2:13
消费者: consumer-name-2消费了:producer-2:14
消费者: consumer-name-2消费了:producer-2:14
生产者: producer-name-1生产了一个数字:producer-1:6
消费者: consumer-name-0消费了:producer-1:6
消费者: consumer-name-0消费了:producer-1:6
生产者: producer-name-1生产了一个数字:producer-1:7
消费者: consumer-name-2消费了:producer-1:7
消费者: consumer-name-2消费了:producer-1:7
生产者: producer-name-1生产了一个数字:producer-1:8
消费者: consumer-name-0消费了:producer-1:8
消费者: consumer-name-0消费了:producer-1:8
生产者: producer-name-1生产了一个数字:producer-1:9
消费者: consumer-name-2消费了:producer-1:9
消费者: consumer-name-2消费了:producer-1:9Process finished with exit code -1

5. await / signalAll

相比于 wait / notifyAll,这种方法需要提供一个 ReentrantLock,然后通过这个 lock 创建出 Condition 实现更细粒度的等待唤醒。

Condition 是 Java 并发编程中的一个接口,它提供了一种方法,使得线程可以在某些条件满足时得到通知或者被唤醒,从而能够继续执行。

Condition 对象通常与 Lock 对象一起使用,用于替代传统的 Object 类的 wait 和 notify/notifyAll 方法,以实现更灵活的线程间协调。

使用这种方法只需要改 ShareBuffer

class ShareBuffer {private final int capacity;private final Queue<String> queue;private final Lock lock = new ReentrantLock();private final Condition waitProducer = lock.newCondition();private final Condition waitConsumer = lock.newCondition();public ShareBuffer(int capacity) {this.capacity = capacity;this.queue = new LinkedList<>();}public void produce(String value) throws InterruptedException {lock.lock();try {while(queue.size() == capacity) {// 阻塞等待waitProducer.await();}System.out.println("生产者: " + Thread.currentThread().getName() + "生产了一个数字:" + value);queue.add(value);// 唤醒消费者线程去竞争waitConsumer.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}public String consume() throws InterruptedException {lock.lock();try {while(queue.size() == 0){// 阻塞等待waitConsumer.await();}String element = queue.poll();// 唤醒生产者线程去添加waitProducer.signalAll();return element;} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}
}

其实就是使用 waitProducerwaitConsumer 来进行线程通信。全部的逻辑如下所示。

public class Main {public static void main(String[] args) throws InterruptedException {ShareBuffer shareBuffer = new ShareBuffer(10);for(int i = 0; i < 5; i++){Producer producer = new Producer(shareBuffer, "producer-" + i, i);Consumer consumer = new Consumer(shareBuffer);new Thread(producer, "producer-name-" + i).start();new Thread(consumer, "consumer-name-" + i).start();}}
}class ShareBuffer {private final int capacity;private final Queue<String> queue;private final Lock lock = new ReentrantLock();private final Condition waitProducer = lock.newCondition();private final Condition waitConsumer = lock.newCondition();public ShareBuffer(int capacity) {this.capacity = capacity;this.queue = new LinkedList<>();}public void produce(String value) throws InterruptedException {lock.lock();try {while(queue.size() == capacity) {// 阻塞等待waitProducer.await();}System.out.println("生产者: " + Thread.currentThread().getName() + "生产了一个数字:" + value);queue.add(value);// 唤醒消费者线程去竞争waitConsumer.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}public String consume() throws InterruptedException {lock.lock();try {while(queue.size() == 0){// 阻塞等待waitConsumer.await();}String element = queue.poll();// 唤醒生产者线程去添加waitProducer.signalAll();return element;} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}
}class Producer implements Runnable{private ShareBuffer shareBuffer;private String prefix;private int start;public Producer(ShareBuffer shareBuffer, String prefix, int start) {this.shareBuffer = shareBuffer;this.prefix = prefix;this.start = start;}@Overridepublic void run() {for(int i = start * 5; i < start * 5 + 5; i++){try {shareBuffer.produce(prefix + ":" + i);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}class Consumer implements Runnable{private ShareBuffer shareBuffer;public Consumer(ShareBuffer shareBuffer) {this.shareBuffer = shareBuffer;}@Overridepublic void run() {while(true){try {String element = shareBuffer.consume();System.out.println("消费者: " + Thread.currentThread().getName() + "消费了:" + element);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}

输出结果如下:

生产者: producer-name-1生产了一个数字:producer-1:5
生产者: producer-name-1生产了一个数字:producer-1:6
生产者: producer-name-1生产了一个数字:producer-1:7
生产者: producer-name-4生产了一个数字:producer-4:20
生产者: producer-name-4生产了一个数字:producer-4:21
生产者: producer-name-4生产了一个数字:producer-4:22
生产者: producer-name-4生产了一个数字:producer-4:23
生产者: producer-name-4生产了一个数字:producer-4:24
生产者: producer-name-0生产了一个数字:producer-0:0
生产者: producer-name-0生产了一个数字:producer-0:1
消费者: consumer-name-1消费了:producer-1:5
生产者: producer-name-2生产了一个数字:producer-2:10
消费者: consumer-name-2消费了:producer-1:6
生产者: producer-name-3生产了一个数字:producer-3:15
消费者: consumer-name-3消费了:producer-1:7
生产者: producer-name-1生产了一个数字:producer-1:8
消费者: consumer-name-0消费了:producer-4:20
生产者: producer-name-1生产了一个数字:producer-1:9
消费者: consumer-name-4消费了:producer-4:21
生产者: producer-name-0生产了一个数字:producer-0:2
消费者: consumer-name-1消费了:producer-4:22
生产者: producer-name-2生产了一个数字:producer-2:11
消费者: consumer-name-2消费了:producer-4:23
生产者: producer-name-3生产了一个数字:producer-3:16
消费者: consumer-name-3消费了:producer-4:24
生产者: producer-name-0生产了一个数字:producer-0:3
生产者: producer-name-0生产了一个数字:producer-0:4
消费者: consumer-name-4消费了:producer-0:1
消费者: consumer-name-0消费了:producer-0:0
消费者: consumer-name-1消费了:producer-2:10
生产者: producer-name-2生产了一个数字:producer-2:12
生产者: producer-name-2生产了一个数字:producer-2:13
消费者: consumer-name-2消费了:producer-3:15
生产者: producer-name-3生产了一个数字:producer-3:17
消费者: consumer-name-3消费了:producer-1:8
消费者: consumer-name-4消费了:producer-1:9
生产者: producer-name-2生产了一个数字:producer-2:14
消费者: consumer-name-0消费了:producer-2:11
消费者: consumer-name-1消费了:producer-3:16
消费者: consumer-name-3消费了:producer-0:2
生产者: producer-name-3生产了一个数字:producer-3:18
消费者: consumer-name-4消费了:producer-0:4
消费者: consumer-name-2消费了:producer-0:3
生产者: producer-name-3生产了一个数字:producer-3:19
消费者: consumer-name-0消费了:producer-2:12
消费者: consumer-name-3消费了:producer-3:17
消费者: consumer-name-1消费了:producer-2:13
消费者: consumer-name-2消费了:producer-3:19
消费者: consumer-name-4消费了:producer-2:14
消费者: consumer-name-0消费了:producer-3:18

6. acquire / release

这种方式就是使用信号量来处理。信号量(Semaphore) 是并发编程中的一种同步机制,用于控制同时访问某一资源的线程数量。它本质上是一个计数器,表示可用资源的数量。

信号量可以用来实现诸如限制同时访问某个资源的线程数、实现生产者-消费者问题等场景。下面是信号量中实现同步的两个方法:

  • 获取许可(acquire): 线程尝试获取一个许可。获取成功,那么计数器减一;如果许可不可用,那么线程就会阻塞,直到有许可可用或超时。
  • 释放许可(release): 线程释放一个许可,计数器加一,并可能唤醒一个等待的线程。

那么下面还是来看下缓冲区的代码,生产者和消费者还是不需要变化:

class ShareBuffer {private final Queue<String> queue;private final Semaphore semFull;private final Semaphore semEmpty;public ShareBuffer(int capacity) {this.queue = new LinkedList<>();this.semFull = new Semaphore(0);this.semEmpty = new Semaphore(capacity);}public void produce(String value) throws InterruptedException {// 获得许可semEmpty.acquire();try {synchronized (this) {System.out.println("生产者: " + Thread.currentThread().getName() + "生产了一个数字:" + value);queue.add(value);}} finally {// 释放一个许可semFull.release();}}public String consume() throws InterruptedException {semFull.acquire();synchronized (this) {try {String element = queue.poll();return element;}  finally {semEmpty.release();}}}
}

下面就是整体代码:

public class Main {public static void main(String[] args) throws InterruptedException {ShareBuffer shareBuffer = new ShareBuffer(10);for (int i = 0; i < 5; i++) {Producer producer = new Producer(shareBuffer, "producer-" + i, i);Consumer consumer = new Consumer(shareBuffer);new Thread(producer, "producer-name-" + i).start();new Thread(consumer, "consumer-name-" + i).start();}}
}class ShareBuffer {private final Queue<String> queue;private final Semaphore semFull;private final Semaphore semEmpty;public ShareBuffer(int capacity) {this.queue = new LinkedList<>();this.semFull = new Semaphore(0);this.semEmpty = new Semaphore(capacity);}public void produce(String value) throws InterruptedException {// 获得许可semEmpty.acquire();try {synchronized (this) {System.out.println("生产者: " + Thread.currentThread().getName() + "生产了一个数字:" + value);queue.add(value);}} finally {// 释放一个许可semFull.release();}}public String consume() throws InterruptedException {semFull.acquire();synchronized (this) {try {String element = queue.poll();return element;}  finally {semEmpty.release();}}}
}class Producer implements Runnable {private ShareBuffer shareBuffer;private String prefix;private int start;public Producer(ShareBuffer shareBuffer, String prefix, int start) {this.shareBuffer = shareBuffer;this.prefix = prefix;this.start = start;}@Overridepublic void run() {for (int i = start * 5; i < start * 5 + 5; i++) {try {shareBuffer.produce(prefix + ":" + i);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}class Consumer implements Runnable {private ShareBuffer shareBuffer;public Consumer(ShareBuffer shareBuffer) {this.shareBuffer = shareBuffer;}@Overridepublic void run() {while (true) {try {String element = shareBuffer.consume();System.out.println("消费者: " + Thread.currentThread().getName() + "消费了:" + element);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}

结果输出如下:

生产者: producer-name-0生产了一个数字:producer-0:0
生产者: producer-name-0生产了一个数字:producer-0:1
消费者: consumer-name-0消费了:producer-0:0
生产者: producer-name-0生产了一个数字:producer-0:2
生产者: producer-name-0生产了一个数字:producer-0:3
生产者: producer-name-0生产了一个数字:producer-0:4
消费者: consumer-name-0消费了:producer-0:1
生产者: producer-name-2生产了一个数字:producer-2:10
生产者: producer-name-2生产了一个数字:producer-2:11
生产者: producer-name-4生产了一个数字:producer-4:20
生产者: producer-name-4生产了一个数字:producer-4:21
生产者: producer-name-4生产了一个数字:producer-4:22
生产者: producer-name-4生产了一个数字:producer-4:23
消费者: consumer-name-0消费了:producer-0:2
生产者: producer-name-4生产了一个数字:producer-4:24
生产者: producer-name-1生产了一个数字:producer-1:5
消费者: consumer-name-2消费了:producer-0:3
消费者: consumer-name-2消费了:producer-2:10
消费者: consumer-name-0消费了:producer-0:4
生产者: producer-name-1生产了一个数字:producer-1:6
生产者: producer-name-1生产了一个数字:producer-1:7
消费者: consumer-name-4消费了:producer-2:11
生产者: producer-name-2生产了一个数字:producer-2:12
生产者: producer-name-2生产了一个数字:producer-2:13
消费者: consumer-name-4消费了:producer-4:20
生产者: producer-name-2生产了一个数字:producer-2:14
消费者: consumer-name-3消费了:producer-4:21
生产者: producer-name-1生产了一个数字:producer-1:8
消费者: consumer-name-0消费了:producer-4:22
生产者: producer-name-3生产了一个数字:producer-3:15
消费者: consumer-name-2消费了:producer-4:23
消费者: consumer-name-1消费了:producer-4:24
生产者: producer-name-3生产了一个数字:producer-3:16
生产者: producer-name-3生产了一个数字:producer-3:17
消费者: consumer-name-0消费了:producer-1:5
生产者: producer-name-1生产了一个数字:producer-1:9
生产者: producer-name-3生产了一个数字:producer-3:18
消费者: consumer-name-3消费了:producer-1:6
生产者: producer-name-3生产了一个数字:producer-3:19
消费者: consumer-name-0消费了:producer-2:12
消费者: consumer-name-1消费了:producer-2:13
消费者: consumer-name-4消费了:producer-1:7
消费者: consumer-name-1消费了:producer-3:16
消费者: consumer-name-1消费了:producer-1:9
消费者: consumer-name-2消费了:producer-1:8
消费者: consumer-name-3消费了:producer-3:15
消费者: consumer-name-0消费了:producer-2:14
消费者: consumer-name-2消费了:producer-3:19
消费者: consumer-name-1消费了:producer-3:18
消费者: consumer-name-4消费了:producer-3:17

7. 阻塞队列

最后一种方法就是阻塞队列,可以通过阻塞队列来进行线程之间的通信,把阻塞等待唤醒的工作交给队列,调用者就不需要关系内部的逻辑了。

class ShareBuffer {private final ArrayBlockingQueue<String> queue;public ShareBuffer(int capacity) {this.queue = new ArrayBlockingQueue<>(capacity);}public void produce(String value) throws InterruptedException {System.out.println("生产者: " + Thread.currentThread().getName() + "生产了一个数字:" + value);queue.add(value);}public String consume() throws InterruptedException {return queue.take();}
}

下面是所有的代码:

public class Main {public static void main(String[] args) throws InterruptedException {ShareBuffer shareBuffer = new ShareBuffer(10);for (int i = 0; i < 5; i++) {Producer producer = new Producer(shareBuffer, "producer-" + i, i);Consumer consumer = new Consumer(shareBuffer);new Thread(producer, "producer-name-" + i).start();new Thread(consumer, "consumer-name-" + i).start();}}
}class ShareBuffer {private final ArrayBlockingQueue<String> queue;public ShareBuffer(int capacity) {this.queue = new ArrayBlockingQueue<>(capacity);}public void produce(String value) throws InterruptedException {System.out.println("生产者: " + Thread.currentThread().getName() + "生产了一个数字:" + value);queue.add(value);}public String consume() throws InterruptedException {return queue.take();}
}class Producer implements Runnable {private ShareBuffer shareBuffer;private String prefix;private int start;public Producer(ShareBuffer shareBuffer, String prefix, int start) {this.shareBuffer = shareBuffer;this.prefix = prefix;this.start = start;}@Overridepublic void run() {for (int i = start * 5; i < start * 5 + 5; i++) {try {shareBuffer.produce(prefix + ":" + i);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}class Consumer implements Runnable {private ShareBuffer shareBuffer;public Consumer(ShareBuffer shareBuffer) {this.shareBuffer = shareBuffer;}@Overridepublic void run() {while (true) {try {String element = shareBuffer.consume();System.out.println("消费者: " + Thread.currentThread().getName() + "消费了:" + element);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}

下面就是输出结果:

生产者: producer-name-1生产了一个数字:producer-1:5
生产者: producer-name-0生产了一个数字:producer-0:0
生产者: producer-name-0生产了一个数字:producer-0:1
生产者: producer-name-1生产了一个数字:producer-1:6
消费者: consumer-name-1消费了:producer-1:5
消费者: consumer-name-1消费了:producer-0:1
生产者: producer-name-1生产了一个数字:producer-1:7
生产者: producer-name-1生产了一个数字:producer-1:8
消费者: consumer-name-2消费了:producer-1:7
消费者: consumer-name-2消费了:producer-1:8
消费者: consumer-name-0消费了:producer-0:0
生产者: producer-name-0生产了一个数字:producer-0:2
生产者: producer-name-0生产了一个数字:producer-0:3
消费者: consumer-name-2消费了:producer-0:2
生产者: producer-name-4生产了一个数字:producer-4:20
生产者: producer-name-1生产了一个数字:producer-1:9
生产者: producer-name-2生产了一个数字:producer-2:10
消费者: consumer-name-1消费了:producer-1:6
消费者: consumer-name-2消费了:producer-2:10
生产者: producer-name-2生产了一个数字:producer-2:11
消费者: consumer-name-4消费了:producer-1:9
消费者: consumer-name-0消费了:producer-4:20
生产者: producer-name-4生产了一个数字:producer-4:21
消费者: consumer-name-3消费了:producer-0:3
生产者: producer-name-0生产了一个数字:producer-0:4
生产者: producer-name-3生产了一个数字:producer-3:15
消费者: consumer-name-4消费了:producer-0:4
生产者: producer-name-3生产了一个数字:producer-3:16
生产者: producer-name-3生产了一个数字:producer-3:17
生产者: producer-name-3生产了一个数字:producer-3:18
生产者: producer-name-3生产了一个数字:producer-3:19
消费者: consumer-name-2消费了:producer-4:21
消费者: consumer-name-4消费了:producer-3:17
消费者: consumer-name-4消费了:producer-3:19
生产者: producer-name-4生产了一个数字:producer-4:22
生产者: producer-name-4生产了一个数字:producer-4:23
生产者: producer-name-4生产了一个数字:producer-4:24
消费者: consumer-name-1消费了:producer-2:11
消费者: consumer-name-1消费了:producer-4:23
消费者: consumer-name-1消费了:producer-4:24
生产者: producer-name-2生产了一个数字:producer-2:12
消费者: consumer-name-4消费了:producer-4:22
消费者: consumer-name-1消费了:producer-2:12
消费者: consumer-name-2消费了:producer-3:18
消费者: consumer-name-3消费了:producer-3:16
消费者: consumer-name-0消费了:producer-3:15
生产者: producer-name-2生产了一个数字:producer-2:13
生产者: producer-name-2生产了一个数字:producer-2:14
消费者: consumer-name-4消费了:producer-2:13
消费者: consumer-name-4消费了:producer-2:14Process finished with exit code -1

8. 小结

生产者和消费者模式就介绍完了,这也是阻塞队列的开端,下面将会介绍 JDK 中的阻塞队列的用法,源码。







如有错误,欢迎指出!!!


http://www.mrgr.cn/news/80791.html

相关文章:

  • 【Token】校验、会话技术、登录请求、拦截器【期末实训】实战项目学生和班级管理系统\Day15-后端Web实战(登录认证)\讲义
  • FFmpeg第一话:FFmpeg 简介与环境搭建
  • React 第十七节 useMemo用法详解
  • Qt中的异步相关类
  • 循环神经网络(RNN)在时序预测中的应用与优势
  • 音频开发中常见的知识体系
  • 深度学习0-前置知识
  • 关于Unity VFX 在Spawn状态的一些笔记
  • Pytorch | 从零构建ParNet/Non-Deep Networks对CIFAR10进行分类
  • 本地虚拟机 docker 中安装体验 qwen2.5 大模型
  • Reactor
  • shell加减乘除运算
  • 电感的基本概念
  • [创业之路-199]:《华为战略管理法-DSTE实战体系》- 3 - 价值转移理论与利润区理论
  • Nautilus源码编译傻瓜式教程二
  • 实操给桌面机器人加上超拟人音色
  • C++算法第十一天
  • Autosar入门_汽车电子控制器
  • [SAP ABAP] ALV报表练习1
  • 第六周作业
  • 社区版 IDEA 开发webapp 配置tomcat
  • 粘包由应用层协议解决
  • iClent3D for Cesium 实现无人机巡检飞行效果
  • 代码生成器
  • 基于 PyCharm 和 Navicat 的新闻管理系统
  • 51c视觉~合集36