1. 概述

本教程将教你如何在Java中实现生产者-消费者问题。这个问题也被称为有界缓冲区问题

关于问题的更多细节,可以参考生产者-消费者问题维基页面。对于Java线程/并发基础知识,建议先阅读我们的Java并发编程文章。

2. 生产者-消费者问题

生产者和消费者是两个独立的进程。两者共享一个公共缓冲区或队列。生产者持续生成数据并推入缓冲区,而消费者则从缓冲区中消费这些数据。

下图展示了这个简单场景:

生产者-消费者

这个问题本质上存在几个需要处理的复杂性

  • 生产者和消费者可能同时尝试更新队列,这可能导致数据丢失或不一致
  • 生产者可能比消费者慢,这种情况下消费者会快速处理元素然后等待
  • 某些情况下消费者可能比生产者慢,这会导致队列溢出问题
  • 实际场景中,我们可能有多个生产者、多个消费者或两者兼有,这可能导致同一条消息被不同消费者处理

下图展示了多生产者多消费者的情况:

多生产者多消费者

我们需要处理资源共享和同步来解决这些复杂性:

  • 在添加和删除数据时对队列进行同步
  • 当队列为空时,消费者必须等待直到生产者添加新数据
  • 当队列满时,生产者必须等待直到消费者消费数据释放缓冲空间

3. 使用线程的Java示例

我们为问题中的每个实体定义了单独的类。

3.1. Message类

Message类保存生产的数据:

public class Message {
    private int id;
    private double data;

    // 构造方法和getter/setter
}

数据可以是任何类型,可能是JSON字符串、复杂对象或简单数字。将数据包装到Message类中也不是必须的。

3.2. DataQueue类

共享队列和相关对象被封装在DataQueue类中:

public class DataQueue {
    private final Queue<Message> queue = new LinkedList<>();
    private final int maxSize;
    private final Object IS_NOT_FULL = new Object();
    private final Object IS_NOT_EMPTY = new Object();

    DataQueue(int maxSize) {
        this.maxSize = maxSize;
    }

    // 其他方法
}

为了创建有界缓冲区,我们使用了一个队列和它的最大容量。

在Java中,synchronized块使用对象来实现线程同步。每个对象都有一个内置锁。只有最先获得锁的线程才能执行synchronized块。

这里我们创建了两个引用IS_NOT_FULL和IS_NOT_EMPTY用于同步。由于这些句柄没有其他用途,我们使用Object类初始化它们。

当队列满时,生产者在IS_NOT_FULL对象上等待。一旦移除消息,我们就通知队列不再满。

生产者进程调用waitIsNotFull方法:

public void waitIsNotFull() throws InterruptedException {
    synchronized (IS_NOT_FULL) {
        IS_NOT_FULL.wait();
    }
}

当消费者轮询消息时,dataQueue通过notifyIsNotFull方法通知生产者:

private void notifyIsNotFull() {
    synchronized (IS_NOT_FULL) {
        IS_NOT_FULL.notify();
    }
}

如果队列为空,消费者在IS_NOT_EMPTY对象上等待。一旦添加消息,我们就通知队列不再空。

消费者进程使用waitIsNotEmpty方法等待:

public void waitIsNotEmpty() throws InterruptedException {
    synchronized (IS_NOT_EMPTY) {
        IS_NOT_EMPTY.wait();
    }
}

当生产者添加消息时,dataQueue通过notifyIsNotEmpty方法通知消费者:

public void notifyIsNotEmpty() {
    synchronized (IS_NOT_EMPTY) {
        IS_NOT_EMPTY.notify();
    }
}

生产者使用add()方法将消息添加到队列:

public void add(Message message) {
    queue.add(message);
    notifyIsNotEmpty();
}

消费者调用remove方法从队列中检索消息:

public Message remove() {
    Message mess = queue.poll();
    notifyIsNotFull();
    return mess;
}

3.3. Producer类

Producer类实现Runnable接口以支持线程创建:

public class Producer implements Runnable {
    private final DataQueue dataQueue;
    private boolean running = false; 
    public Producer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
    }

    @Override
    public void run() {
        running = true;
        produce();
    }

    // 其他方法
}

构造函数使用共享的dataQueue参数。成员变量running用于优雅地停止生产者进程,初始化为true。

线程启动时调用produce()方法:

public void produce() {
    while (running) {
        if(dataQueue.isFull()) {
            try {
                dataQueue.waitIsNotFull();
            } catch (InterruptedException e) {
                log.severe("Error while waiting to Produce messages.");
                break;
            }
        }
        if (!running) {
            break;
        }
        dataQueue.add(generateMessage());
    }
    log.info("Producer Stopped");
}

生产者在while循环中持续运行步骤。当running为false时循环中断。

每次迭代生成一条消息,然后检查队列是否满并根据需要等待。

当生产者从等待中唤醒时,检查是否需要继续或中断进程。它向队列添加消息并通知等待在空队列上的消费者。

stop()方法优雅终止进程:

public void stop() {
    running = false;
    dataQueue.notifyIsNotFull();
}

将running标志改为false后,通知所有处于"队列满"等待状态的生产者。这确保所有生产者线程终止。

3.4. Consumer类

Consumer类实现Runnable以支持线程创建:

public class Consumer implements Runnable {
    private final DataQueue dataQueue;
    private boolean running = false;

    public Consumer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
    }

    @Override
    public void run() {
        consume();
    }

    // 其他方法
}

其构造函数以共享的dataQueue为参数。running标志初始化为true,用于在需要时停止消费者进程。

线程启动时运行consume方法

public void consume() {
    while (running) {
        if(dataQueue.isEmpty()) {
            try {
                dataQueue.waitIsNotEmpty();
            } catch (InterruptedException e) {
                log.severe("Error while waiting to Consume messages.");
                break;
            }
        }
        if (!running) {
            break;
        }
        Message message = dataQueue.poll();
        useMessage(message);
    }
    log.info("Consumer Stopped");
}

它有一个持续运行的while循环。当running标志为false时进程优雅停止。

每次迭代检查队列是否空。如果队列空,消费者等待消息被生产

当消费者从等待中唤醒时,检查running标志。如果标志为false则退出循环,否则从队列读取消息并通知等待在"满队列"状态的生产者,最后消费消息。

使用stop()方法优雅停止进程:

public void stop() {
    running = false;
    dataQueue.notifyIsNotEmpty();
}

running标志设为false后,通知所有等待在空队列状态的消费者。这确保所有消费者线程终止。

3.5. 运行生产者和消费者线程

创建具有最大所需容量的dataQueue对象:

DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);

创建producer对象和线程:

Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);

初始化consumer对象和线程:

Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);

最后启动线程以启动进程:

producerThread.start();
consumerThread.start();

它会持续运行直到我们想停止这些线程。停止它们很简单:

producer.stop();
consumer.stop();

3.6. 运行多个生产者和消费者

运行多个生产者和消费者类似于单生产者消费者场景。只需创建所需数量的线程并启动它们

创建多个生产者和线程并启动:

List<Producer> producers = new ArrayList<>();
for(int i = 0; i < producerCount; i++) {
    Producer producer = new Producer(dataQueue);
    Thread producerThread = new Thread(producer);
    producerThread.start();
    producers.add(producer);
}

接下来创建所需数量的消费者对象和线程:

List<Consumer> consumers = new ArrayList<>();
for(int i = 0; i < consumerCount; i++) {
    Consumer consumer = new Consumer(dataQueue);
    Thread consumerThread = new Thread(consumer);
    consumerThread.start();
    consumers.add(consumer);
}

通过调用生产者和消费者对象的stop()方法优雅停止进程:

consumers.forEach(Consumer::stop);
producers.forEach(Producer::stop);

4. 使用BlockingQueue的简化示例

Java提供了线程安全的BlockingQueue接口。换句话说,多个线程可以从此队列添加和移除消息而不会出现并发问题

其put()方法在队列满时阻塞调用线程。类似地,take()方法在队列空时阻塞调用线程。

4.1. 创建有界BlockingQueue

使用构造函数中的容量值创建有界BlockingQueue:

BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);

4.2. 简化的produce方法

在produce()方法中,可以避免对队列的显式同步:

private void produce() {
    while (true) {
        double value = generateValue();
        try {
            blockingQueue.put(value);
        } catch (InterruptedException e) {
            break;
        }
    }
}

此方法持续生成对象并直接添加到队列。

4.3. 简化的consume方法

consume()方法不使用显式同步:

private void consume() {
    while (true) {
        Double value;
        try {
            value = blockingQueue.take();
        } catch (InterruptedException e) {
            break;
        }
        // 消费value
    }
}

它只是从队列获取一个值并持续消费。

4.4. 运行生产者和消费者线程

可以根据需要创建任意数量的生产者和消费者线程:

for (int i = 0; i < 2; i++) {
    Thread producerThread = new Thread(this::produce);
    producerThread.start();
}

for (int i = 0; i < 3; i++) {
    Thread consumerThread = new Thread(this::consume);
    consumerThread.start();
}

5. 总结

本文中,我们学习了如何使用Java线程实现生产者-消费者问题,还了解了如何运行多生产者多消费者场景。简单粗暴地说,BlockingQueue是个更省心的选择,但理解底层实现机制能帮你避开更多并发坑。


原始标题:Producer-Consumer Problem With Example in Java | Baeldung