Java多线程与高并发专题——什么是阻塞队列?
引入
阻塞队列(Blocking Queue)是一种线程安全的队列数据结构,它的主要特点是:
- 线程安全:多个线程可以安全地同时访问队列。
- 阻塞操作:当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有元素可用;当队列为满时,向队列中插入元素的操作会被阻塞,直到队列有空间可用。
BlockingQueue
BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。下面我们看下它的源码注释:
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false, depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table:
Summary of BlockingQueue methods
Throws exception Special valueBlocksTimes out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable
A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null. A null is used as a sentinel value to indicate failure of poll operations.
A BlockingQueue may be capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking. A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer. MAX_VALUE.
BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x). However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. However, the bulk Collection operations addAll, containsAll, retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c.
A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.
Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.
class Producer implements Runnable {private final BlockingQueue queue;Producer(BlockingQueue q) { queue = q; }public void run() {try {while (true) {queue.put(produce());}} catch (InterruptedException ex) { ...handle ...}}Object produce() { ...} } class Consumer implements Runnable {private final BlockingQueue queue;Consumer(BlockingQueue q) {queue = q;}public void run() {try {while (true) {consume(queue.take());}} catch (InterruptedException ex) { ...handle ...}}void consume(object x) { ...} } class Setup {void main() {BlockingQueue q = new SomeQueueImplementation();Producer p = new Producer(q);Consumer c1 = new Consumer(q);Consumer c2 = new Consumer(q);new Thread(p).start();new Thread(c1).start();new Thread(c2).start();} }
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.
翻译:
一种队列,另外支持在检索元素时等待队列变为非空的操作,以及在存储元素时等待队列中有空间可用的操作。
阻塞队列的方法有四种形式,以不同的方式处理无法立即满足但将来可能满足的操作:一种抛出异常,第二种返回一个特殊值(根据操作不同,可以是 null 或 false),第三种无限期地阻塞当前线程,直到操作可以成功,第四种在放弃前仅阻塞给定的最大时间限制。这些方法在下表中进行了总结:
阻塞队列方法摘要抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不适用 不适用
阻塞队列不接受 null 元素。实现方式在尝试添加、放入或提供 null 时会抛出 NullPointerException。null 用作哨兵值,用于指示轮询操作失败。
阻塞队列可能是容量受限的。在任何时候,它可能有一个剩余容量,超过该容量后,无法在不阻塞的情况下添加更多元素。没有内在容量限制的阻塞队列始终报告剩余容量为 Integer.MAX_VALUE。
阻塞队列实现旨在主要用于生产者-消费者队列,但也支持集合接口。因此,例如,可以使用 remove(x) 从队列中移除任意元素。然而,此类操作通常执行效率不高,仅用于偶尔使用,例如当队列中的消息被取消时。
阻塞队列实现是线程安全的。所有排队方法都通过内部锁或其他形式的并发控制原子地实现其效果。然而,除非实现中另有说明,否则批量集合操作 addAll、containsAll、retainAll 和 removeAll 不一定原子执行。因此,例如,addAll(c) 可能在添加 c 中的某些元素后失败(抛出异常)。
阻塞队列本身不支持任何“close”或“shutdown”操作来表示不再添加更多项。此类功能的需求和使用通常是实现依赖的。例如,一个常见的策略是生产者插入特殊的结束流或毒药对象,这些对象在被消费者取出时会相应地被解释。
基于典型的生产者-消费者场景的使用示例。注意,阻塞队列可以安全地与多个生产者和多个消费者一起使用。/*** 生产者类,实现了 Runnable 接口,用于向阻塞队列中生产元素。*/ class Producer implements Runnable {// 声明一个私有的、不可变的阻塞队列,用于存储生产的元素private final BlockingQueue queue;/*** 构造函数,初始化生产者的阻塞队列。* @param q 用于存储生产元素的阻塞队列*/Producer(BlockingQueue q) { queue = q; }/*** 实现 Runnable 接口的 run 方法,该方法在新线程启动时执行。* 生产者将持续生产元素并将其放入队列中,直到线程被中断。*/public void run() {try {// 无限循环,持续生产元素while (true) {// 调用 produce 方法生产元素,并将其放入队列中queue.put(produce());}} catch (InterruptedException ex) {// 处理线程中断异常...handle ...}}/*** 生产元素的方法,具体实现由子类完成。* @return 生产的元素*/Object produce() { ...} }/*** 消费者类,实现了 Runnable 接口,用于从阻塞队列中消费元素。*/ class Consumer implements Runnable {// 声明一个私有的、不可变的阻塞队列,用于存储待消费的元素private final BlockingQueue queue;/*** 构造函数,初始化消费者的阻塞队列。* @param q 用于存储待消费元素的阻塞队列*/Consumer(BlockingQueue q) {queue = q;}/*** 实现 Runnable 接口的 run 方法,该方法在新线程启动时执行。* 消费者将持续从队列中取出元素并消费,直到线程被中断。*/public void run() {try {// 无限循环,持续消费元素while (true) {// 从队列中取出元素,并调用 consume 方法进行消费consume(queue.take());}} catch (InterruptedException ex) {// 处理线程中断异常...handle ...}}/*** 消费元素的方法,具体实现由子类完成。* @param x 待消费的元素*/void consume(Object x) { ...} }/*** 设置类,用于启动生产者和消费者线程。*/ class Setup {/*** 主方法,用于启动生产者和消费者线程。*/void main() {// 创建一个阻塞队列的实例BlockingQueue q = new SomeQueueImplementation();// 创建一个生产者实例,并将队列传递给它Producer p = new Producer(q);// 创建第一个消费者实例,并将队列传递给它Consumer c1 = new Consumer(q);// 创建第二个消费者实例,并将队列传递给它Consumer c2 = new Consumer(q);// 启动生产者线程new Thread(p).start();// 启动第一个消费者线程new Thread(c1).start();// 启动第二个消费者线程new Thread(c2).start();} }
内存一致性效果:与其他并发集合一样,在线程中将对象放入阻塞队列之前的操作,会在另一个线程访问或移除该元素之后发生。
BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。比如说,使用生产者/消费者模式的时候,我们生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可以了。
因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。而且既然队列本身是线程安全的,队列可以安全地从一个线程向另外一个线程传递数据,所以我们的生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开发的难度和工作量。
同时,队列它还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。
这样就实现了具体任务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。
主要并发队列关系图
上图展示了 Queue 最主要的实现类,可以看出 Java 提供的线程安全的队列(也称为并发队列)分为阻塞队列和非阻塞队列两大类。
阻塞队列的典型例子就是 BlockingQueue 接口的实现类,BlockingQueue 下面有 6 种最主要的实现,分别是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、
PriorityBlockingQueue 和 LinkedTransferQueue。
非阻塞并发队列的典型例子是 ConcurrentLinkedQueue,这个类不会让线程阻塞,利用 CAS 保证了线程安全。
我们可以根据需要自由选取阻塞队列或者非阻塞队列来满足业务需求。
还有一个和 Queue 关系紧密的 Deque 接口,它继承了 Queue,其源码注释如下:
A linear collection that supports element insertion and removal at both ends. The name deque is short for "double ended queue" and is usually pronounced "deck". Most Deque implementations place no fixed limits on the number of elements they may contain, but this interface supports capacity-restricted deques as well as those with no fixed size limit.
This interface defines methods to access the elements at both ends of the deque. Methods are provided to insert, remove, and examine the element. Each of these methods exists in two forms: one throws an exception if the operation fails, the other returns a special value (either null or false, depending on the operation). The latter form of the insert operation is designed specifically for use with capacity-restricted Deque implementations; in most implementations, insert operations cannot fail.
The twelve methods described above are summarized in the following table:
Summary of Deque methods
First Element (Head) Last Element (Tail)
Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()
This interface extends the Queue interface. When a deque is used as a queue, FIFO (First-In-First-Out) behavior results. Elements are added at the end of the deque and removed from the beginning. The methods inherited from the Queue interface are precisely equivalent to Deque methods as indicated in the following table:
Comparison of Queue and Deque methods
Queue Method Equivalent Deque Method
add(e) addLast(e)
offer(e) offerLast(e)
remove() removeFirst()
poll() pollFirst()
element() getFirst()
peek() peekFirst()
Deques can also be used as LIFO (Last-In-First-Out) stacks. This interface should be used in preference to the legacy Stack class. When a deque is used as a stack, elements are pushed and popped from the beginning of the deque. Stack methods are precisely equivalent to Deque methods as indicated in the table below:
Comparison of Stack and Deque methods
Stack Method Equivalent Deque Method
push(e) addFirst(e)
pop() removeFirst()
peek() peekFirst()
Note that the peek method works equally well when a deque is used as a queue or a stack; in either case, elements are drawn from the beginning of the deque.
This interface provides two methods to remove interior elements, removeFirstOccurrence and removeLastOccurrence.
Unlike the List interface, this interface does not provide support for indexed access to elements.
While Deque implementations are not strictly required to prohibit the insertion of null elements, they are strongly encouraged to do so. Users of any Deque implementations that do allow null elements are strongly encouraged not to take advantage of the ability to insert nulls. This is so because null is used as a special return value by various methods to indicated that the deque is empty.
Deque implementations generally do not define element-based versions of the equals and hashCode methods, but instead inherit the identity-based versions from class Object.
This interface is a member of the Java Collections Framework.
翻译:
一种线性集合,支持在两端插入和删除元素。名称 deque 是 "double ended queue" 的缩写,通常发音为 "deck"。大多数 Deque 实现对它们可能包含的元素数量没有固定限制,但此接口既支持容量受限的 deque,也支持没有固定大小限制的 deque。
此接口定义了访问 deque 两端元素的方法。提供了插入、删除和检查元素的方法。这些方法每种都存在两种形式:一种在操作失败时抛出异常,另一种返回特殊值(根据操作不同,可以是 null 或 false)。后者的插入操作是专门为容量受限的 Deque 实现设计的;在大多数实现中,插入操作不会失败。
上述十二种方法在下表中进行了总结:
Deque 方法摘要
首元素(表头) 末元素(表尾)
抛出异常 特殊值 抛出异常 特殊值
插入 addFirst(e) offerFirst(e) addLast(e) offerLast(e)
移除 removeFirst() pollFirst() removeLast() pollLast()
检查 getFirst() peekFirst() getLast() peekLast()
此接口扩展了 Queue 接口。当将 deque 用作队列时,会产生 FIFO(先进先出)行为。元素被添加到 deque 的末尾,并从开头移除。从 Queue 接口继承的方法与 Deque 方法完全等价,如下表所示:
Queue 和 Deque 方法的比较
Queue 方法 等价的 Deque 方法
add(e) addLast(e)
offer(e) offerLast(e)
remove() removeFirst()
poll() pollFirst()
element() getFirst()
peek() peekFirst()
Deque 也可用作 LIFO(后进先出)栈。应优先使用此接口而非过时的 Stack 类。当将 deque 用作栈时,元素从 deque 的开头压入和弹出。Stack 方法与 Deque 方法完全等价,如下表所示:
Stack 和 Deque 方法的比较
Stack 方法 等价的 Deque 方法
push(e) addFirst(e)
pop() removeFirst()
peek() peekFirst()
请注意,peek 方法在 deque 用作队列或栈时都能很好地工作;在这两种情况下,元素都从 deque 的开头提取。
此接口提供了两种方法来移除内部元素:removeFirstOccurrence 和 removeLastOccurrence。
与 List 接口不同,此接口不提供对元素的索引访问支持。
虽然 Deque 实现不要求严格禁止插入 null 元素,但强烈建议这样做。如果某些 Deque 实现允许插入 null 元素,用户应尽量避免利用此功能。这是因为各种方法将 null 用作特殊返回值来表示 deque 为空。
Deque 实现通常不定义基于元素的 equals 和 hashCode 方法,而是从 Object 类继承基于身份的版本。
此接口是 Java 集合框架的成员。
Deque 的意思是双端队列,音标是 [dek],是 double-ended-queue 的缩写,它从头和尾都能添加和删除元素;而普通的 Queue 只能从一端进入,另一端出去。这是 Deque 和 Queue 的不同之处,Deque其他方面的性质都和 Queue 类似。
阻塞队列的特点
阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能:阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。
take 方法
take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。
put 方法
put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。
是否有界(容量有多大)
此外,阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。
无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是Integer.MAX_VALUE,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。
但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。