1. 概述

本文将深入探讨java.util.concurrent包中解决生产者-消费者并发问题的核心工具——BlockingQueue。我们将分析BlockingQueue接口的API设计,并展示如何利用其方法简化并发编程。

后续将通过一个多生产者-多消费者的实战案例,演示BlockingQueue的实际应用。

2. BlockingQueue类型

BlockingQueue可分为两种实现类型:

  • 无界队列:容量几乎可无限增长
  • 有界队列:具有固定最大容量

2.1 无界队列

创建无界队列非常简单:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

此时队列容量被设为Integer.MAX_VALUE。所有添加操作永远不会阻塞,可能导致队列无限膨胀。

⚠️ 踩坑提醒:使用无界队列时,必须确保消费者处理速度能跟上生产者速度,否则极易引发OutOfMemoryError

2.2 有界队列

有界队列通过构造函数指定容量:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

这个队列容量为10。当队列已满时:

  • 使用put()方法会阻塞直到有空位
  • 使用add()/offer()会立即失败

最佳实践:有界队列天然具备流量控制能力,能自动平衡生产消费速度,是并发编程的首选方案。

3. BlockingQueue API详解

BlockingQueue接口包含两类核心方法:元素添加和元素获取。每类方法在队列满/空时行为各异。

3.1 元素添加方法

方法 行为
add() 成功返回true,队列满时抛出IllegalStateException
put() 队列满时阻塞直到有空位
offer() 成功返回true,队列满时返回false
offer(E e, long timeout, TimeUnit unit) 在指定超时时间内尝试插入,超时返回false

3.2 元素获取方法

方法 行为
take() 队列空时阻塞直到有元素
poll(long timeout, TimeUnit unit) 指定超时内等待元素,超时返回null

这些方法是构建生产者-消费者程序的基础组件。

4. 多线程生产者-消费者实战

我们构建一个包含生产者和消费者的程序:

  • 生产者:生成0-100的随机数存入队列
  • 消费者:从队列取出数据并处理

使用4个生产者线程,消费者数量等于CPU核心数。关键点在于如何优雅终止消费者线程——这里采用"毒丸对象"(Poison Pill)模式。

4.1 生产者实现

public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;
    
    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
        }
     }
}

生产者完成数据生成后,会向队列投放指定数量的"毒丸"对象。

4.2 消费者实现

public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;
    
    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

消费者通过take()方法阻塞获取元素,遇到毒丸对象时优雅退出。

4.3 程序启动代码

int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 1; i < N_PRODUCERS; i++) {
    new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new NumbersConsumer(queue, poisonPill)).start();
}

new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();

关键设计点:

  1. 队列容量设为10(有界队列)
  2. 毒丸对象使用Integer.MAX_VALUE(正常数据不会出现)
  3. 毒丸数量精确计算:N_CONSUMERS / N_PRODUCERS,余数分配给最后一个生产者

运行时,4个生产者线程持续向队列投递随机数,消费者线程并行处理数据,各线程输出包含线程名和处理结果。

5. 总结

本文系统介绍了BlockingQueue的核心API及其在并发编程中的应用。通过生产者-消费者案例,展示了如何利用BlockingQueue实现线程间高效协作:

  • ✅ 有界队列提供天然流量控制
  • ✅ 阻塞方法简化线程同步逻辑
  • ✅ 毒丸模式实现优雅终止

完整示例代码可在GitHub项目获取(Maven项目,可直接导入运行)。


原始标题:Guide to java.util.concurrent.BlockingQueue | Baeldung