1. 概述
本文将深入探讨java.util.concurrent
包中解决生产者-消费者并发问题的核心工具——BlockingQueue
。我们将分析BlockingQueue
接口的API设计,并展示如何利用其方法简化并发编程。
后续将通过一个多生产者-多消费者的实战案例,演示BlockingQueue
的实际应用。
2. BlockingQueue类型
BlockingQueue
可分为两种实现类型:
- 无界队列:容量几乎可无限增长
- 有界队列:具有固定最大容量
2.1 无界队列
创建无界队列非常简单:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
此时队列容量被设为Integer.MAX_VALUE
。所有添加操作永远不会阻塞,可能导致队列无限膨胀。
⚠️ 踩坑提醒:使用无界队列时,必须确保消费者处理速度能跟上生产者速度,否则极易引发OutOfMemoryError
。
2.2 有界队列
有界队列通过构造函数指定容量:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
这个队列容量为10。当队列已满时:
- 使用
put()
方法会阻塞直到有空位 - 使用
add()
/offer()
会立即失败
✅ 最佳实践:有界队列天然具备流量控制能力,能自动平衡生产消费速度,是并发编程的首选方案。
3. BlockingQueue API详解
BlockingQueue
接口包含两类核心方法:元素添加和元素获取。每类方法在队列满/空时行为各异。
3.1 元素添加方法
方法 | 行为 |
---|---|
add() |
成功返回true,队列满时抛出IllegalStateException |
put() |
队列满时阻塞直到有空位 |
offer() |
成功返回true,队列满时返回false |
offer(E e, long timeout, TimeUnit unit) |
在指定超时时间内尝试插入,超时返回false |
3.2 元素获取方法
方法 | 行为 |
---|---|
take() |
队列空时阻塞直到有元素 |
poll(long timeout, TimeUnit unit) |
指定超时内等待元素,超时返回null |
这些方法是构建生产者-消费者程序的基础组件。
4. 多线程生产者-消费者实战
我们构建一个包含生产者和消费者的程序:
- 生产者:生成0-100的随机数存入队列
- 消费者:从队列取出数据并处理
使用4个生产者线程,消费者数量等于CPU核心数。关键点在于如何优雅终止消费者线程——这里采用"毒丸对象"(Poison Pill)模式。
4.1 生产者实现
public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}
生产者完成数据生成后,会向队列投放指定数量的"毒丸"对象。
4.2 消费者实现
public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " result: " + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
消费者通过take()
方法阻塞获取元素,遇到毒丸对象时优雅退出。
4.3 程序启动代码
int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
关键设计点:
- 队列容量设为10(有界队列)
- 毒丸对象使用
Integer.MAX_VALUE
(正常数据不会出现) - 毒丸数量精确计算:
N_CONSUMERS / N_PRODUCERS
,余数分配给最后一个生产者
运行时,4个生产者线程持续向队列投递随机数,消费者线程并行处理数据,各线程输出包含线程名和处理结果。
5. 总结
本文系统介绍了BlockingQueue
的核心API及其在并发编程中的应用。通过生产者-消费者案例,展示了如何利用BlockingQueue
实现线程间高效协作:
- ✅ 有界队列提供天然流量控制
- ✅ 阻塞方法简化线程同步逻辑
- ✅ 毒丸模式实现优雅终止
完整示例代码可在GitHub项目获取(Maven项目,可直接导入运行)。