1. 概述

多线程编程允许我们同时运行线程,每个线程可以处理不同的任务。因此,它可以最佳地利用资源,特别是当我们的计算机具有多个多核 CPU 或多个 CPU 时。

有时,我们想控制多个线程同时启动。

在本教程中,我们将首先了解需求,尤其是“完全相同的时间”的含义。此外,我们将讨论如何在 Java 中同时启动两个线程。

2. 理解需求

我们的要求是:“同时启动两个线程。”

这个要求看起来很容易理解。但是,如果我们仔细考虑一下,是否有可能 同时 启动两个线程?

首先,每个线程都会消耗CPU时间来工作。因此, 如果我们的应用程序运行在具有单核CPU的计算机上,则不可能 同时 启动两个线程。

如果我们的计算机具有多核 CPU 或多个 CPU,则两个线程可能会 同时 启动。然而,我们无法在Java端控制它。

这是因为当我们在Java中使用线程时, Java线程的调度取决于操作系统的线程调度 。因此,不同的操作系统可能会以不同的方式处理它。

此外,如果我们以更严格的方式讨论“完全相同的时间”,根据爱因斯坦的狭义相对论

如果两个不同的事件在空间上是分开的,则不可能从绝对意义上说这两个事件同时发生。

无论我们的 CPU 与主板或 CPU 中的核心距离有多近,都存在空间。因此,我们不能确保两个线程 同时 启动。

那么,这是否意味着该要求无效?

不,这是一个有效的要求。即使我们不能使两个线程 同时 启动,我们也可以通过一些同步技术来非常接近。

在大多数实际情况下,当我们需要“同时”启动两个线程时,这些技术可能会对我们有所帮助。

在本教程中,我们将探索解决此问题的两种方法:

所有方法都遵循相同的想法:我们不会真正同时启动两个线程。相反,我们在线程启动后立即阻塞线程,并尝试同时恢复它们的执行。

由于我们的测试与线程调度相关,因此值得一提的是本教程中运行测试的环境:

  • CPU:英特尔(R) 酷睿(TM) i7-8850H CPU。处理器时钟介于 2.6 至 4.3 GHz 之间(4.1 GHz,4 核,4 GHz,6 核)
  • 操作系统:64位Linux,内核版本5.12.12
  • Java:Java 11

现在,让我们看看 CountDonwLatchCyclicBarrier 的 实际应用。

3.使用 CountDownLatch

CountDownLatch 是 Java 5 中引入的同步器,作为 java.util.concurrent 包的一部分。通常, 我们使用 CountDownLatch 来阻塞线程,直到其他线程完成其任务。

简单来说,我们在一个 闩锁 对象中设置一个 计数 ,并将 闩锁 对象与一些线程关联起来。当我们启动这些线程时,它们将被阻塞,直到锁存器的计数变为零。

另一方面,在其他线程中,我们可以控制在什么条件下减少 计数 并让阻塞的线程恢复,例如当主线程中的某些任务完成时。

3.1.工作线程

现在,让我们看看如何使用 CountDownLatch 类解决我们的问题。

首先,我们将创建 Thread 类。我们称之为 WorkerWithCountDownLatch

public class WorkerWithCountDownLatch extends Thread {
    private CountDownLatch latch;

    public WorkerWithCountDownLatch(String name, CountDownLatch latch) {
        this.latch = latch;
        setName(name);
    }

    @Override public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the latch...\n", getName());
            latch.await();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (InterruptedException e) {
            // handle exception
        }
    }

我们在 WorkerWithCountDownLatch 类中添加了一个 闩锁 对象。首先我们来了解一下 latch 对象的功能。

run() 方法中,我们调用方法 latch.await()。 这意味着,如果我们启动 工作 线程,它将检查 闩锁的计数。 线程将被阻塞,直到 计数 为零。

这样,我们就可以在主线程中创建一个 count=1CountDownLatch(1) 闩锁,并将 闩锁 对象关联到我们要同时启动的两个工作线程。

当我们希望两个线程恢复执行其实际工作时,我们通过在主线程中调用 latch.countDown() 来释放闩锁。

接下来我们看看主线程是如何控制两个工作线程的。

3.2.主线程

我们将在 usingCountDownLatch() 方法中实现主线程:

private static void usingCountDownLatch() throws InterruptedException {
    System.out.println("===============================================");
    System.out.println("        >>> Using CountDownLatch <<<<");
    System.out.println("===============================================");

    CountDownLatch latch = new CountDownLatch(1);

    WorkerWithCountDownLatch worker1 = new WorkerWithCountDownLatch("Worker with latch 1", latch);
    WorkerWithCountDownLatch worker2 = new WorkerWithCountDownLatch("Worker with latch 2", latch);

    worker1.start();
    worker2.start();

    Thread.sleep(10);//simulation of some actual work

    System.out.println("-----------------------------------------------");
    System.out.println(" Now release the latch:");
    System.out.println("-----------------------------------------------");
    latch.countDown();
}

现在,让我们从 main() 方法中调用上面的 usingCountDownLatch() 方法。当我们运行 main() 方法时,我们将看到输出:

===============================================
        >>> Using CountDownLatch <<<<
===============================================
[ Worker with latch 1 ] created, blocked by the latch
[ Worker with latch 2 ] created, blocked by the latch
-----------------------------------------------
 Now release the latch:
-----------------------------------------------
[ Worker with latch 2 ] starts at: 2021-06-27T16:00:52.268532035Z
[ Worker with latch 1 ] starts at: 2021-06-27T16:00:52.268533787Z

如上面的输出所示, 两个工作线程 几乎 同时启动。两个启动时间之间的差异小于两微秒

4.使用 CyclicBarrier

CyclicBarrier 类是 Java 5 中引入的另一个同步器。本质上, CyclicBarrier 允许固定数量的线程在继续执行之前等待彼此到达公共点

接下来,让我们看看如何使用 CyclicBarrier 类解决我们的问题。

4.1.工作线程

我们首先看一下我们的工作线程的实现:

public class WorkerWithCyclicBarrier extends Thread {
    private CyclicBarrier barrier;

    public WorkerWithCyclicBarrier(String name, CyclicBarrier barrier) {
        this.barrier = barrier;
        this.setName(name);
    }

    @Override public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the barrier\n", getName());
            barrier.await();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (InterruptedException | BrokenBarrierException e) {
            // handle exception
        }
    }
}

实现非常简单。我们将 屏障 对象与工作线程相关联。当线程启动时,我们立即调用 barrier.await() 方法。

这样,工作线程将被阻塞并等待各方调用 barrier.await() 来恢复。

4.2.主线程

接下来我们看一下如何控制主线程中两个工作线程的恢复:

private static void usingCyclicBarrier() throws BrokenBarrierException, InterruptedException {
    System.out.println("\n===============================================");
    System.out.println("        >>> Using CyclicBarrier <<<<");
    System.out.println("===============================================");

    CyclicBarrier barrier = new CyclicBarrier(3);

    WorkerWithCyclicBarrier worker1 = new WorkerWithCyclicBarrier("Worker with barrier 1", barrier);
    WorkerWithCyclicBarrier worker2 = new WorkerWithCyclicBarrier("Worker with barrier 2", barrier);

    worker1.start();
    worker2.start();

    Thread.sleep(10);//simulation of some actual work

    System.out.println("-----------------------------------------------");
    System.out.println(" Now open the barrier:");
    System.out.println("-----------------------------------------------");
    barrier.await();
}

我们的目标是让两个工作线程同时恢复。因此,加上主线程,我们总共有三个线程。

如上面的方法所示,我们在主线程中创建了一个具有三方的 屏障 对象。接下来,我们创建并启动两个工作线程。

正如我们之前讨论的,两个工作线程被阻塞并等待屏障打开以恢复。

在主线程中,我们可以做一些实际的工作。当我们决定打开屏障时,我们调用方法 barrier.await() 来让两个worker继续执行。

如果我们在 main() 方法中调用 usingCyclicBarrier() ,我们将得到输出:

===============================================
        >>> Using CyclicBarrier <<<<
===============================================
[ Worker with barrier 1 ] created, blocked by the barrier
[ Worker with barrier 2 ] created, blocked by the barrier
-----------------------------------------------
 Now open the barrier:
-----------------------------------------------
[ Worker with barrier 1 ] starts at: 2021-06-27T16:00:52.311346392Z
[ Worker with barrier 2 ] starts at: 2021-06-27T16:00:52.311348874Z

我们可以比较两次worker的开始时间。即使两个工作人员没有在完全相同的时间启动,我们也非常接近我们的目标:两个启动时间之间的差异小于三微秒。

5. 使用 Phaser

Phaser 类是 Java 7 中引入的同步器。它类似于 CyclicBarrierCountDownLatch 。然而, Phaser 类更加灵活。

例如,与 CyclicBarrierCountDownLatch 不同, Phaser 允许我们动态注册线程方。

接下来,我们使用 Phaser 来解决这个问题。

5.1.工作线程

像往常一样,我们首先看一下实现,然后了解它是如何工作的:

public class WorkerWithPhaser extends Thread {
    private Phaser phaser;

    public WorkerWithPhaser(String name, Phaser phaser) {
        this.phaser = phaser;
        phaser.register();
        setName(name);
    }

    @Override public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the phaser\n", getName());
            phaser.arriveAndAwaitAdvance();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (IllegalStateException e) {
            // handle exception
        }
    }
}

当实例化工作线程时,我们通过调用 Phaser.register() 将当前线程注册到给定的 Phaser 对象。这样,当前工作就成为 Phaser Barrier 的一个线程方。

接下来,当工作线程启动时,我们立即调用 phaser.arriveAndAwaitAdvance() 。因此,我们告诉 phaser 当前线程已经到达并将等待其他线程方的到达来继续。当然,在其他线程方到来之前,当前线程是被阻塞的。

5.2.主线程

接下来我们继续看主线程的实现:

private static void usingPhaser() throws InterruptedException {
    System.out.println("\n===============================================");
    System.out.println("        >>> Using Phaser <<<");
    System.out.println("===============================================");

    Phaser phaser = new Phaser();
    phaser.register();

    WorkerWithPhaser worker1 = new WorkerWithPhaser("Worker with phaser 1", phaser);
    WorkerWithPhaser worker2 = new WorkerWithPhaser("Worker with phaser 2", phaser);

    worker1.start();
    worker2.start();

    Thread.sleep(10);//simulation of some actual work

    System.out.println("-----------------------------------------------");
    System.out.println(" Now open the phaser barrier:");
    System.out.println("-----------------------------------------------");
    phaser.arriveAndAwaitAdvance();
}

在上面的代码中,我们可以看到, 主线程将自己注册为 Phaser 对象的线程方

在我们创建并阻止两个 工作 线程之后,主线程也会调用 phaser.arriveAndAwaitAdvance() 。这样,我们就打开了 Phaser Barrier,使得两个 工作 线程可以同时恢复。

最后,我们在 main() 方法中调用 usingPhaser() 方法:

===============================================
        >>> Using Phaser <<<
===============================================
[ Worker with phaser 1 ] created, blocked by the phaser
[ Worker with phaser 2 ] created, blocked by the phaser
-----------------------------------------------
 Now open the phaser barrier:
-----------------------------------------------
[ Worker with phaser 2 ] starts at: 2021-07-18T17:39:27.063523636Z
[ Worker with phaser 1 ] starts at: 2021-07-18T17:39:27.063523827Z

同样, 两个工作线程 几乎 同时启动。两个启动时间之间的差异小于两微秒

六,结论

在本文中,我们首先讨论了这个要求:“同时启动两个线程”。

接下来,我们讨论了同时启动三个线程的两种方法:使用 CountDownLatchCyclicBarrierPhaser

他们的想法很相似,阻塞两个线程并试图让它们同时恢复执行。

尽管这些方法不能保证两个线程在完全相同的时间启动,但结果对于现实世界中的大多数情况来说非常接近且足够。

与往常一样,本文的代码可以在 GitHub 上找到。