BlockingQueue
是 Java 中的一个接口,属于 java.util.concurrent
包。它为队列提供了线程安全的阻塞操作,特别适合用于生产者-消费者模式。在 BlockingQueue
中,当队列为空时,消费者线程会被阻塞,直到有新元素进入队列;当队列已满时,生产者线程会被阻塞,直到队列有空间可以插入新元素。
1. 关键特性
线程安全:
BlockingQueue
自带线程安全机制,适合多线程环境。阻塞操作:提供了阻塞插入和阻塞删除操作。
可选的容量限制:可以设置容量上限,避免队列无限增长。
不允许 null 元素:插入 null 值会抛出
NullPointerException
。
2. 主要实现类
ArrayBlockingQueue:基于数组的有界队列,初始化时需指定大小。
LinkedBlockingQueue:基于链表的阻塞队列,支持有界或无界,常用于高吞吐量场景。
PriorityBlockingQueue:基于优先级排序的无界阻塞队列,内部元素按优先级排序。
DelayQueue:延迟队列,元素带有延迟时间,适合实现定时任务。
SynchronousQueue:一种没有缓冲的队列,插入操作必须等到有线程接收。
3. 核心方法
添加元素:
put(E e)
:如果队列满了,则等待队列可用空间。offer(E e, long timeout, TimeUnit unit)
:指定超时时间,尝试插入元素。移除元素:
take()
:如果队列为空,则等待元素入队。poll(long timeout, TimeUnit unit)
:指定超时时间,尝试取出元素。其他操作:
remainingCapacity()
:返回队列的剩余可用空间。drainTo(Collection<? super E> c)
:批量从队列中移出所有元素并添加到指定集合中。
4. 使用示例
java复制代码import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // Producer Thread Thread producer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { queue.put(i); System.out.println("Produced: " + i); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // Consumer Thread Thread consumer = new Thread(() -> { try { while (true) { Integer item = queue.take(); System.out.println("Consumed: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); producer.start(); consumer.start(); } }
5. 典型应用场景
生产者-消费者模式:生产者将任务放入队列,消费者从队列中获取任务,适用于高并发环境。
任务调度:例如
DelayQueue
用于延迟任务处理。负载均衡:通过
BlockingQueue
在多个线程间分发任务,实现均衡处理。
6. 注意事项
阻塞机制:
put
和take
方法会在必要时阻塞线程,避免频繁的锁和等待。选择合适的实现类:根据需要选择有界队列或无界队列、优先级队列、延迟队列等。
BlockingQueue
为多线程并发提供了高效的队列管理机制,适合在高并发、任务队列、限流等场景中使用。