1. 概述
本教程将教你如何在Java中实现生产者-消费者问题。这个问题也被称为有界缓冲区问题。
关于问题的更多细节,可以参考生产者-消费者问题维基页面。对于Java线程/并发基础知识,建议先阅读我们的Java并发编程文章。
2. 生产者-消费者问题
生产者和消费者是两个独立的进程。两者共享一个公共缓冲区或队列。生产者持续生成数据并推入缓冲区,而消费者则从缓冲区中消费这些数据。
下图展示了这个简单场景:
这个问题本质上存在几个需要处理的复杂性:
- 生产者和消费者可能同时尝试更新队列,这可能导致数据丢失或不一致
- 生产者可能比消费者慢,这种情况下消费者会快速处理元素然后等待
- 某些情况下消费者可能比生产者慢,这会导致队列溢出问题
- 实际场景中,我们可能有多个生产者、多个消费者或两者兼有,这可能导致同一条消息被不同消费者处理
下图展示了多生产者多消费者的情况:
我们需要处理资源共享和同步来解决这些复杂性:
- 在添加和删除数据时对队列进行同步
- 当队列为空时,消费者必须等待直到生产者添加新数据
- 当队列满时,生产者必须等待直到消费者消费数据释放缓冲空间
3. 使用线程的Java示例
我们为问题中的每个实体定义了单独的类。
3.1. Message类
Message类保存生产的数据:
public class Message {
private int id;
private double data;
// 构造方法和getter/setter
}
数据可以是任何类型,可能是JSON字符串、复杂对象或简单数字。将数据包装到Message类中也不是必须的。
3.2. DataQueue类
共享队列和相关对象被封装在DataQueue类中:
public class DataQueue {
private final Queue<Message> queue = new LinkedList<>();
private final int maxSize;
private final Object IS_NOT_FULL = new Object();
private final Object IS_NOT_EMPTY = new Object();
DataQueue(int maxSize) {
this.maxSize = maxSize;
}
// 其他方法
}
为了创建有界缓冲区,我们使用了一个队列和它的最大容量。
在Java中,synchronized块使用对象来实现线程同步。每个对象都有一个内置锁。只有最先获得锁的线程才能执行synchronized块。
这里我们创建了两个引用IS_NOT_FULL和IS_NOT_EMPTY用于同步。由于这些句柄没有其他用途,我们使用Object类初始化它们。
当队列满时,生产者在IS_NOT_FULL对象上等待。一旦移除消息,我们就通知队列不再满。
生产者进程调用waitIsNotFull方法:
public void waitIsNotFull() throws InterruptedException {
synchronized (IS_NOT_FULL) {
IS_NOT_FULL.wait();
}
}
当消费者轮询消息时,dataQueue通过notifyIsNotFull方法通知生产者:
private void notifyIsNotFull() {
synchronized (IS_NOT_FULL) {
IS_NOT_FULL.notify();
}
}
如果队列为空,消费者在IS_NOT_EMPTY对象上等待。一旦添加消息,我们就通知队列不再空。
消费者进程使用waitIsNotEmpty方法等待:
public void waitIsNotEmpty() throws InterruptedException {
synchronized (IS_NOT_EMPTY) {
IS_NOT_EMPTY.wait();
}
}
当生产者添加消息时,dataQueue通过notifyIsNotEmpty方法通知消费者:
public void notifyIsNotEmpty() {
synchronized (IS_NOT_EMPTY) {
IS_NOT_EMPTY.notify();
}
}
生产者使用add()方法将消息添加到队列:
public void add(Message message) {
queue.add(message);
notifyIsNotEmpty();
}
消费者调用remove方法从队列中检索消息:
public Message remove() {
Message mess = queue.poll();
notifyIsNotFull();
return mess;
}
3.3. Producer类
Producer类实现Runnable接口以支持线程创建:
public class Producer implements Runnable {
private final DataQueue dataQueue;
private boolean running = false;
public Producer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
}
@Override
public void run() {
running = true;
produce();
}
// 其他方法
}
构造函数使用共享的dataQueue参数。成员变量running用于优雅地停止生产者进程,初始化为true。
线程启动时调用produce()方法:
public void produce() {
while (running) {
if(dataQueue.isFull()) {
try {
dataQueue.waitIsNotFull();
} catch (InterruptedException e) {
log.severe("Error while waiting to Produce messages.");
break;
}
}
if (!running) {
break;
}
dataQueue.add(generateMessage());
}
log.info("Producer Stopped");
}
生产者在while循环中持续运行步骤。当running为false时循环中断。
每次迭代生成一条消息,然后检查队列是否满并根据需要等待。
当生产者从等待中唤醒时,检查是否需要继续或中断进程。它向队列添加消息并通知等待在空队列上的消费者。
stop()方法优雅终止进程:
public void stop() {
running = false;
dataQueue.notifyIsNotFull();
}
将running标志改为false后,通知所有处于"队列满"等待状态的生产者。这确保所有生产者线程终止。
3.4. Consumer类
Consumer类实现Runnable以支持线程创建:
public class Consumer implements Runnable {
private final DataQueue dataQueue;
private boolean running = false;
public Consumer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
}
@Override
public void run() {
consume();
}
// 其他方法
}
其构造函数以共享的dataQueue为参数。running标志初始化为true,用于在需要时停止消费者进程。
线程启动时运行consume方法:
public void consume() {
while (running) {
if(dataQueue.isEmpty()) {
try {
dataQueue.waitIsNotEmpty();
} catch (InterruptedException e) {
log.severe("Error while waiting to Consume messages.");
break;
}
}
if (!running) {
break;
}
Message message = dataQueue.poll();
useMessage(message);
}
log.info("Consumer Stopped");
}
它有一个持续运行的while循环。当running标志为false时进程优雅停止。
每次迭代检查队列是否空。如果队列空,消费者等待消息被生产。
当消费者从等待中唤醒时,检查running标志。如果标志为false则退出循环,否则从队列读取消息并通知等待在"满队列"状态的生产者,最后消费消息。
使用stop()方法优雅停止进程:
public void stop() {
running = false;
dataQueue.notifyIsNotEmpty();
}
running标志设为false后,通知所有等待在空队列状态的消费者。这确保所有消费者线程终止。
3.5. 运行生产者和消费者线程
创建具有最大所需容量的dataQueue对象:
DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);
创建producer对象和线程:
Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);
初始化consumer对象和线程:
Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);
最后启动线程以启动进程:
producerThread.start();
consumerThread.start();
它会持续运行直到我们想停止这些线程。停止它们很简单:
producer.stop();
consumer.stop();
3.6. 运行多个生产者和消费者
运行多个生产者和消费者类似于单生产者消费者场景。只需创建所需数量的线程并启动它们。
创建多个生产者和线程并启动:
List<Producer> producers = new ArrayList<>();
for(int i = 0; i < producerCount; i++) {
Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);
producerThread.start();
producers.add(producer);
}
接下来创建所需数量的消费者对象和线程:
List<Consumer> consumers = new ArrayList<>();
for(int i = 0; i < consumerCount; i++) {
Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);
consumerThread.start();
consumers.add(consumer);
}
通过调用生产者和消费者对象的stop()方法优雅停止进程:
consumers.forEach(Consumer::stop);
producers.forEach(Producer::stop);
4. 使用BlockingQueue的简化示例
Java提供了线程安全的BlockingQueue接口。换句话说,多个线程可以从此队列添加和移除消息而不会出现并发问题。
其put()方法在队列满时阻塞调用线程。类似地,take()方法在队列空时阻塞调用线程。
4.1. 创建有界BlockingQueue
使用构造函数中的容量值创建有界BlockingQueue:
BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);
4.2. 简化的produce方法
在produce()方法中,可以避免对队列的显式同步:
private void produce() {
while (true) {
double value = generateValue();
try {
blockingQueue.put(value);
} catch (InterruptedException e) {
break;
}
}
}
此方法持续生成对象并直接添加到队列。
4.3. 简化的consume方法
consume()方法不使用显式同步:
private void consume() {
while (true) {
Double value;
try {
value = blockingQueue.take();
} catch (InterruptedException e) {
break;
}
// 消费value
}
}
它只是从队列获取一个值并持续消费。
4.4. 运行生产者和消费者线程
可以根据需要创建任意数量的生产者和消费者线程:
for (int i = 0; i < 2; i++) {
Thread producerThread = new Thread(this::produce);
producerThread.start();
}
for (int i = 0; i < 3; i++) {
Thread consumerThread = new Thread(this::consume);
consumerThread.start();
}
5. 总结
本文中,我们学习了如何使用Java线程实现生产者-消费者问题,还了解了如何运行多生产者多消费者场景。简单粗暴地说,BlockingQueue是个更省心的选择,但理解底层实现机制能帮你避开更多并发坑。