1. 概述

本文将介绍CountDownLatch类,并通过几个实际示例展示如何使用它。本质上,CountDownLatch允许我们在一个线程中阻塞,直到其他线程完成特定任务。

2. 并发编程中的使用

简单来说,CountDownLatch有一个计数器字段,可以根据需要递减。然后我们可以使用它来阻塞调用线程,直到计数器降为零。

在并行处理场景中,我们可以根据希望并发执行的线程数量初始化CountDownLatch,每个线程完成后调用countdown(),这样依赖于它的线程调用await()就会被阻塞,直到所有工作线程完成。

3. 阻止线程池完成

让我们通过创建一个Worker类,并使用CountDownLatch字段来信号任务完成来进行演示:

public class Worker implements Runnable {
    private List<String> outputScraper;
    private CountDownLatch countDownLatch;

    public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        doSomeWork();
        outputScraper.add("Counted down");
        countDownLatch.countDown();
    }
}

接下来,我们编写一个测试以验证CountDownLatch可以等待Worker实例完成:

@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
  throws InterruptedException {

    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

      workers.forEach(Thread::start);
      countDownLatch.await(); 
      outputScraper.add("Latch released");

      assertThat(outputScraper)
        .containsExactly(
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Latch released"
        );
    }

自然地,“Latch released”总是最后的输出,因为它依赖于CountDownLatch释放。

请注意,如果我们不调用await(),就无法保证线程的执行顺序,测试可能会随机失败。

4. 让线程池等待开始

如果我们之前的例子,但这次启动数千个线程而不是五个,那么很可能是早期的线程在启动较晚的线程之前就已经完成了处理。这可能使得重现并发问题变得困难,因为我们无法确保所有线程都并行运行。

为了解决这个问题,我们可以让CountDownLatch的工作方式与上一个例子不同。与其让父线程阻塞直到子线程完成,我们可以让每个子线程阻塞,直到所有其他线程开始。

让我们修改run()方法,在处理前先进行阻塞:

public class WaitingWorker implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;

    public WaitingWorker(
      List<String> outputScraper,
      CountDownLatch readyThreadCounter,
      CountDownLatch callingThreadBlocker,
      CountDownLatch completedThreadCounter) {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            doSomeWork();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

现在,让我们修改测试,让它阻塞直到所有Workers开始,解除阻塞Workers,然后阻塞直到它们完成:

@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
 throws InterruptedException {
 
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch readyThreadCounter = new CountDownLatch(5);
    CountDownLatch callingThreadBlocker = new CountDownLatch(1);
    CountDownLatch completedThreadCounter = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new WaitingWorker(
        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    readyThreadCounter.await(); 
    outputScraper.add("Workers ready");
    callingThreadBlocker.countDown(); 
    completedThreadCounter.await(); 
    outputScraper.add("Workers complete");

    assertThat(outputScraper)
      .containsExactly(
        "Workers ready",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Workers complete"
      );
}

这个模式对于尝试重现并发错误非常有用,因为它可以迫使数千个线程尝试并行执行某些逻辑。

5. 提前终止CountDownLatch

有时,Workers可能在计数下达到零之前因错误而终止,导致CountDownLatch永远不会到达零,await()也永远不会结束:

@Override
public void run() {
    if (true) {
        throw new RuntimeException("Oh dear, I'm a BrokenWorker");
    }
    countDownLatch.countDown();
    outputScraper.add("Counted down");
}

为了展示await()会无限期阻塞,我们修改之前的测试,使用一个BrokenWorker

@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
  throws InterruptedException {
 
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    countDownLatch.await();
}

显然,这不是我们想要的行为,应用程序应该继续运行,而不是无限阻塞。

为解决这个问题,我们可以在await()调用中添加超时参数。

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

如我们所见,测试最终会超时,await()将返回false

6. 总结

在这篇快速指南中,我们展示了如何使用CountDownLatch来阻塞一个线程,直到其他线程完成一些处理。我们也展示了它如何帮助调试并发问题,确保线程并行运行。

这些示例的实现可在GitHub上找到,这是一个基于Maven的项目,可以直接运行。