1. 概述
多线程编程允许我们同时运行线程,每个线程可以处理不同的任务。因此,它可以最佳地利用资源,特别是当我们的计算机具有多个多核 CPU 或多个 CPU 时。
有时,我们想控制多个线程同时启动。
在本教程中,我们将首先了解需求,尤其是“完全相同的时间”的含义。此外,我们将讨论如何在 Java 中同时启动两个线程。
2. 理解需求
我们的要求是:“同时启动两个线程。”
这个要求看起来很容易理解。但是,如果我们仔细考虑一下,是否有可能 同时 启动两个线程?
首先,每个线程都会消耗CPU时间来工作。因此, 如果我们的应用程序运行在具有单核CPU的计算机上,则不可能 同时 启动两个线程。
如果我们的计算机具有多核 CPU 或多个 CPU,则两个线程可能会 同时 启动。然而,我们无法在Java端控制它。
这是因为当我们在Java中使用线程时, Java线程的调度取决于操作系统的线程调度 。因此,不同的操作系统可能会以不同的方式处理它。
此外,如果我们以更严格的方式讨论“完全相同的时间”,根据爱因斯坦的狭义相对论:
如果两个不同的事件在空间上是分开的,则不可能从绝对意义上说这两个事件同时发生。
无论我们的 CPU 与主板或 CPU 中的核心距离有多近,都存在空间。因此,我们不能确保两个线程 同时 启动。
那么,这是否意味着该要求无效?
不,这是一个有效的要求。即使我们不能使两个线程 同时 启动,我们也可以通过一些同步技术来非常接近。
在大多数实际情况下,当我们需要“同时”启动两个线程时,这些技术可能会对我们有所帮助。
在本教程中,我们将探索解决此问题的两种方法:
- 使用 CountDownLatch 类
- 使用 CyclicBarrier 类
- 使用 Phaser 类
所有方法都遵循相同的想法:我们不会真正同时启动两个线程。相反,我们在线程启动后立即阻塞线程,并尝试同时恢复它们的执行。
由于我们的测试与线程调度相关,因此值得一提的是本教程中运行测试的环境:
- CPU:英特尔(R) 酷睿(TM) i7-8850H CPU。处理器时钟介于 2.6 至 4.3 GHz 之间(4.1 GHz,4 核,4 GHz,6 核)
- 操作系统:64位Linux,内核版本5.12.12
- Java:Java 11
现在,让我们看看 CountDonwLatch 和 CyclicBarrier 的 实际应用。
3.使用 CountDownLatch 类
CountDownLatch 是 Java 5 中引入的同步器,作为 java.util.concurrent 包的一部分。通常, 我们使用 CountDownLatch 来阻塞线程,直到其他线程完成其任务。
简单来说,我们在一个 闩锁 对象中设置一个 计数 ,并将 闩锁 对象与一些线程关联起来。当我们启动这些线程时,它们将被阻塞,直到锁存器的计数变为零。
另一方面,在其他线程中,我们可以控制在什么条件下减少 计数 并让阻塞的线程恢复,例如当主线程中的某些任务完成时。
3.1.工作线程
现在,让我们看看如何使用 CountDownLatch 类解决我们的问题。
首先,我们将创建 Thread 类。我们称之为 WorkerWithCountDownLatch :
public class WorkerWithCountDownLatch extends Thread {
private CountDownLatch latch;
public WorkerWithCountDownLatch(String name, CountDownLatch latch) {
this.latch = latch;
setName(name);
}
@Override public void run() {
try {
System.out.printf("[ %s ] created, blocked by the latch...\n", getName());
latch.await();
System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
// do actual work here...
} catch (InterruptedException e) {
// handle exception
}
}
我们在 WorkerWithCountDownLatch 类中添加了一个 闩锁 对象。首先我们来了解一下 latch 对象的功能。
在 run() 方法中,我们调用方法 latch.await()。 这意味着,如果我们启动 工作 线程,它将检查 闩锁的计数。 线程将被阻塞,直到 计数 为零。
这样,我们就可以在主线程中创建一个 count=1 的 CountDownLatch(1) 闩锁,并将 闩锁 对象关联到我们要同时启动的两个工作线程。
当我们希望两个线程恢复执行其实际工作时,我们通过在主线程中调用 latch.countDown() 来释放闩锁。
接下来我们看看主线程是如何控制两个工作线程的。
3.2.主线程
我们将在 usingCountDownLatch() 方法中实现主线程:
private static void usingCountDownLatch() throws InterruptedException {
System.out.println("===============================================");
System.out.println(" >>> Using CountDownLatch <<<<");
System.out.println("===============================================");
CountDownLatch latch = new CountDownLatch(1);
WorkerWithCountDownLatch worker1 = new WorkerWithCountDownLatch("Worker with latch 1", latch);
WorkerWithCountDownLatch worker2 = new WorkerWithCountDownLatch("Worker with latch 2", latch);
worker1.start();
worker2.start();
Thread.sleep(10);//simulation of some actual work
System.out.println("-----------------------------------------------");
System.out.println(" Now release the latch:");
System.out.println("-----------------------------------------------");
latch.countDown();
}
现在,让我们从 main() 方法中调用上面的 usingCountDownLatch() 方法。当我们运行 main() 方法时,我们将看到输出:
===============================================
>>> Using CountDownLatch <<<<
===============================================
[ Worker with latch 1 ] created, blocked by the latch
[ Worker with latch 2 ] created, blocked by the latch
-----------------------------------------------
Now release the latch:
-----------------------------------------------
[ Worker with latch 2 ] starts at: 2021-06-27T16:00:52.268532035Z
[ Worker with latch 1 ] starts at: 2021-06-27T16:00:52.268533787Z
如上面的输出所示, 两个工作线程 几乎 同时启动。两个启动时间之间的差异小于两微秒 。
4.使用 CyclicBarrier 类
CyclicBarrier 类是 Java 5 中引入的另一个同步器。本质上, CyclicBarrier 允许固定数量的线程在继续执行之前等待彼此到达公共点 。
接下来,让我们看看如何使用 CyclicBarrier 类解决我们的问题。
4.1.工作线程
我们首先看一下我们的工作线程的实现:
public class WorkerWithCyclicBarrier extends Thread {
private CyclicBarrier barrier;
public WorkerWithCyclicBarrier(String name, CyclicBarrier barrier) {
this.barrier = barrier;
this.setName(name);
}
@Override public void run() {
try {
System.out.printf("[ %s ] created, blocked by the barrier\n", getName());
barrier.await();
System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
// do actual work here...
} catch (InterruptedException | BrokenBarrierException e) {
// handle exception
}
}
}
实现非常简单。我们将 屏障 对象与工作线程相关联。当线程启动时,我们立即调用 barrier.await() 方法。
这样,工作线程将被阻塞并等待各方调用 barrier.await() 来恢复。
4.2.主线程
接下来我们看一下如何控制主线程中两个工作线程的恢复:
private static void usingCyclicBarrier() throws BrokenBarrierException, InterruptedException {
System.out.println("\n===============================================");
System.out.println(" >>> Using CyclicBarrier <<<<");
System.out.println("===============================================");
CyclicBarrier barrier = new CyclicBarrier(3);
WorkerWithCyclicBarrier worker1 = new WorkerWithCyclicBarrier("Worker with barrier 1", barrier);
WorkerWithCyclicBarrier worker2 = new WorkerWithCyclicBarrier("Worker with barrier 2", barrier);
worker1.start();
worker2.start();
Thread.sleep(10);//simulation of some actual work
System.out.println("-----------------------------------------------");
System.out.println(" Now open the barrier:");
System.out.println("-----------------------------------------------");
barrier.await();
}
我们的目标是让两个工作线程同时恢复。因此,加上主线程,我们总共有三个线程。
如上面的方法所示,我们在主线程中创建了一个具有三方的 屏障 对象。接下来,我们创建并启动两个工作线程。
正如我们之前讨论的,两个工作线程被阻塞并等待屏障打开以恢复。
在主线程中,我们可以做一些实际的工作。当我们决定打开屏障时,我们调用方法 barrier.await() 来让两个worker继续执行。
如果我们在 main() 方法中调用 usingCyclicBarrier() ,我们将得到输出:
===============================================
>>> Using CyclicBarrier <<<<
===============================================
[ Worker with barrier 1 ] created, blocked by the barrier
[ Worker with barrier 2 ] created, blocked by the barrier
-----------------------------------------------
Now open the barrier:
-----------------------------------------------
[ Worker with barrier 1 ] starts at: 2021-06-27T16:00:52.311346392Z
[ Worker with barrier 2 ] starts at: 2021-06-27T16:00:52.311348874Z
我们可以比较两次worker的开始时间。即使两个工作人员没有在完全相同的时间启动,我们也非常接近我们的目标:两个启动时间之间的差异小于三微秒。
5. 使用 Phaser 类
Phaser 类是 Java 7 中引入的同步器。它类似于 CyclicBarrier 和 CountDownLatch 。然而, Phaser 类更加灵活。
例如,与 CyclicBarrier 和 CountDownLatch 不同, Phaser 允许我们动态注册线程方。
接下来,我们使用 Phaser 来解决这个问题。
5.1.工作线程
像往常一样,我们首先看一下实现,然后了解它是如何工作的:
public class WorkerWithPhaser extends Thread {
private Phaser phaser;
public WorkerWithPhaser(String name, Phaser phaser) {
this.phaser = phaser;
phaser.register();
setName(name);
}
@Override public void run() {
try {
System.out.printf("[ %s ] created, blocked by the phaser\n", getName());
phaser.arriveAndAwaitAdvance();
System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
// do actual work here...
} catch (IllegalStateException e) {
// handle exception
}
}
}
当实例化工作线程时,我们通过调用 Phaser.register() 将当前线程注册到给定的 Phaser 对象。这样,当前工作就成为 Phaser Barrier 的一个线程方。
接下来,当工作线程启动时,我们立即调用 phaser.arriveAndAwaitAdvance() 。因此,我们告诉 phaser 当前线程已经到达并将等待其他线程方的到达来继续。当然,在其他线程方到来之前,当前线程是被阻塞的。
5.2.主线程
接下来我们继续看主线程的实现:
private static void usingPhaser() throws InterruptedException {
System.out.println("\n===============================================");
System.out.println(" >>> Using Phaser <<<");
System.out.println("===============================================");
Phaser phaser = new Phaser();
phaser.register();
WorkerWithPhaser worker1 = new WorkerWithPhaser("Worker with phaser 1", phaser);
WorkerWithPhaser worker2 = new WorkerWithPhaser("Worker with phaser 2", phaser);
worker1.start();
worker2.start();
Thread.sleep(10);//simulation of some actual work
System.out.println("-----------------------------------------------");
System.out.println(" Now open the phaser barrier:");
System.out.println("-----------------------------------------------");
phaser.arriveAndAwaitAdvance();
}
在上面的代码中,我们可以看到, 主线程将自己注册为 Phaser 对象的线程方 。
在我们创建并阻止两个 工作 线程之后,主线程也会调用 phaser.arriveAndAwaitAdvance() 。这样,我们就打开了 Phaser Barrier,使得两个 工作 线程可以同时恢复。
最后,我们在 main() 方法中调用 usingPhaser() 方法:
===============================================
>>> Using Phaser <<<
===============================================
[ Worker with phaser 1 ] created, blocked by the phaser
[ Worker with phaser 2 ] created, blocked by the phaser
-----------------------------------------------
Now open the phaser barrier:
-----------------------------------------------
[ Worker with phaser 2 ] starts at: 2021-07-18T17:39:27.063523636Z
[ Worker with phaser 1 ] starts at: 2021-07-18T17:39:27.063523827Z
同样, 两个工作线程 几乎 同时启动。两个启动时间之间的差异小于两微秒 。
六,结论
在本文中,我们首先讨论了这个要求:“同时启动两个线程”。
接下来,我们讨论了同时启动三个线程的两种方法:使用 CountDownLatch 、 CyclicBarrier 和 Phaser 。
他们的想法很相似,阻塞两个线程并试图让它们同时恢复执行。
尽管这些方法不能保证两个线程在完全相同的时间启动,但结果对于现实世界中的大多数情况来说非常接近且足够。
与往常一样,本文的代码可以在 GitHub 上找到。