1. 概述
本文将深入探讨 java.util.concurrent
包中的 SynchronousQueue。简单来说,这个实现允许我们以线程安全的方式在多线程间交换数据。
⚠️ 注意:SynchronousQueue 本质上是一个零容量的队列,它更像是一个线程间的数据交换点,而不是传统意义上的队列。
2. API 概览
SynchronousQueue 只支持 两个核心操作:take()
和 put()
,且两者都是阻塞的。
- ✅
put(E e)
:插入元素,会阻塞直到有其他线程调用take()
取走元素 - ✅
take()
:获取元素,会阻塞直到有其他线程调用put()
插入元素
虽然它实现了 Queue 接口,但最好将其理解为 单元素交换点:一个线程交出元素,另一个线程接收该元素。
3. 使用共享变量实现数据传递
为了理解 SynchronousQueue 的价值,我们先通过共享变量实现线程间数据传递(踩坑示例),再用 SynchronousQueue 重写对比。
假设有两个线程:生产者和消费者。生产者设置共享变量后通知消费者,消费者获取该值。我们使用 CountDownLatch
防止消费者提前访问未设置的变量。
3.1 初始化组件
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
3.2 生产者逻辑
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
sharedState.set(producedElement);
countDownLatch.countDown();
};
3.3 消费者逻辑
Runnable consumer = () -> {
try {
countDownLatch.await();
Integer consumedElement = sharedState.get();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
3.4 启动程序
executor.execute(producer);
executor.execute(consumer);
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);
输出示例:
Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point
❌ 问题:为简单功能写了大量代码,且需要手动协调线程同步。
4. 使用 SynchronousQueue 实现数据传递
现在用 SynchronousQueue 重写相同功能。它既能交换数据又能协调线程,无需额外同步工具。
4.1 初始化队列
ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
4.2 生产者逻辑
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {
queue.put(producedElement);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
4.3 消费者逻辑
Runnable consumer = () -> {
try {
Integer consumedElement = queue.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
4.4 启动程序
executor.execute(producer);
executor.execute(consumer);
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);
输出示例:
Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point
✅ 优势:代码更简洁,语义更清晰,SynchronousQueue 自动处理线程协调。
5. 总结
SynchronousQueue 是线程间数据交换的利器,特别适用于:
- 生产者-消费者模式
- 线程间直接传递数据
- 需要严格同步的场景
相比手动实现(共享变量+CountDownLatch),它提供了:
- 更简洁的代码:自动处理阻塞/唤醒
- 更强的语义:明确表达"交换"意图
- 更好的可维护性:减少同步工具的混用
完整示例代码可在 GitHub 项目 获取(Maven 项目,可直接导入运行)。