1. 概述

本文将深入探讨 java.util.concurrent 包中的 SynchronousQueue。简单来说,这个实现允许我们以线程安全的方式在多线程间交换数据。

⚠️ 注意:SynchronousQueue 本质上是一个零容量的队列,它更像是一个线程间的数据交换点,而不是传统意义上的队列。

2. API 概览

SynchronousQueue 只支持 两个核心操作:take()put(),且两者都是阻塞的

  • put(E e):插入元素,会阻塞直到有其他线程调用 take() 取走元素
  • take():获取元素,会阻塞直到有其他线程调用 put() 插入元素

虽然它实现了 Queue 接口,但最好将其理解为 单元素交换点:一个线程交出元素,另一个线程接收该元素。

3. 使用共享变量实现数据传递

为了理解 SynchronousQueue 的价值,我们先通过共享变量实现线程间数据传递(踩坑示例),再用 SynchronousQueue 重写对比。

假设有两个线程:生产者和消费者。生产者设置共享变量后通知消费者,消费者获取该值。我们使用 CountDownLatch 防止消费者提前访问未设置的变量。

3.1 初始化组件

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

3.2 生产者逻辑

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};

3.3 消费者逻辑

Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

3.4 启动程序

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

输出示例:

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

❌ 问题:为简单功能写了大量代码,且需要手动协调线程同步。

4. 使用 SynchronousQueue 实现数据传递

现在用 SynchronousQueue 重写相同功能。它既能交换数据又能协调线程,无需额外同步工具。

4.1 初始化队列

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

4.2 生产者逻辑

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

4.3 消费者逻辑

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

4.4 启动程序

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

输出示例:

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point

✅ 优势:代码更简洁,语义更清晰,SynchronousQueue 自动处理线程协调。

5. 总结

SynchronousQueue 是线程间数据交换的利器,特别适用于:

  • 生产者-消费者模式
  • 线程间直接传递数据
  • 需要严格同步的场景

相比手动实现(共享变量+CountDownLatch),它提供了:

  1. 更简洁的代码:自动处理阻塞/唤醒
  2. 更强的语义:明确表达"交换"意图
  3. 更好的可维护性:减少同步工具的混用

完整示例代码可在 GitHub 项目 获取(Maven 项目,可直接导入运行)。


原始标题:A Guide to Java SynchronousQueue