1. 概述
本文我们将研究 Java 中最基础核心的——线程同步问题。首先我们将讲解一些理论基础知识,
然后通过开发一个涉及并发问题的简单应用,来更好地理解 wait()
和 notify()
2. Java 线程同步
在多线程环境下,多个线程可能尝试修改同一个资源。如果线程管理不当,会引发一致性问题。
2.1. 受保护的代码块(Guarded Block)
一种可以实现多线程协同工作的办法是使用——Guarded Block。此代码块会循环检查一个条件,直到条件满足才恢复执行。
public synchronized void guardedJoy() {
// This guard only loops once for each special event, which may not
// be the event we're waiting for.
while(!joy) { // 检查是否满足执行条件
try {
wait(); // 不满足,等待被notify()唤醒
} catch (InterruptedException e) {}
}
// 继续执行
}
基于这样的理念,我们可以利用:
- Object.wait() – 挂起一个线程
- Object.notify() – 唤醒一个线程
从下图可以更好地理解这一点。此图描绘了一个线程的生命周期:
注意到有多种方式可以控制一个线程的生命周期。而本文,我们主要关注 wait()
和 notify()
3. wait() 方法
简单来说,当我们调用一个对象的 wait()
方法时,会强制当前线程进入等待状态,直到另一个线程调用此对象的 notify()
或 notifyAll()
方法。
当前线程必须是此对象的监视器(moniter)拥有者。那么成为拥有者呢,根据Javadocs文档说明:
- we've executed synchronized instance method for the given object(执行对象的同步实例方法)
- we've executed the body of a synchronized block on the given object (在此对象上执行同步代码块)
- by executing synchronized static methods for objects of type Class (对于 Class 类型的对象,执行同步静态方法)
请注意,同一时间只能有一个活动线程可以拥一个对象的监视器
wait()
有3个重载方法:
3.1. 不带参的 wait()
此方法会使当前线程无限等待,直到另一个线程调用此对象的 notify()
或 notifyAll()
方法。
3.2. wait(long timeout)
该方法可以指定一个等待超时时间,超时后可以自动唤醒。
wait(0) 等同于 wait()。
3.3. wait(long timeout, int nanos)
此方法功能和上面一样,不同的是可以通过第二个参数设置更高的时间精度(纳秒)。
所以总时间还要加上第二个参数,计算方式为(以纳秒为单位):1_000_000*timeout + nanos。
4. notify() 和 notifyAll()
notify()
方法用于唤醒一个正在等待此对象监视器的线程。
这里有2种方式可以唤醒一个等待线程。
4.1. notify()
对于所有正在等待此对象监控器的线程(即调用 wait() 方法的线程),notify()
只会随机唤醒一个线程。具体选择那个线程,不能确定,由底层实现决定。
由于 notify()
只会随机唤醒一个线程,因此在多个线程执行类似任务情况下可以用它来实现互斥锁,但大多数情况下,更建议使用 notifyAll()
。
4.2. notifyAll()
该方法会唤醒所有等待此对象监控器的线程。
唤醒后的线程以正常的方式继续完成工作,就像其他线程一样。
但允许线程继续往下执行之前,我们一定要先检查是否满足了允许执行的条件,即while语句条件是否满足。因为可能出现线程被唤醒,但并没有收到通知的情况(这个问题我们稍后讨论)
5. 生产 - 消费者同步问题
在理解基本概念之后,我们来看一个 Sender – Receiver 例子,例子中使用 wait()
和 notfiy()
实现两者同步:
-
Sender
向Receiver
发送数据包 -
Sender
未发送完毕之前,Receiver
不能进行处理 - 同理,
Receiver
未处理完上一条数据包之前,Sender
不能发送下一条的数据包
首先我们创建一个 Data
类,packet
为我们要发送和处理的数据。我们将使用 wait()
和notifyAll()
来实现同步操作:
public class Data {
private String packet;
// True if receiver should wait
// False if sender should wait
private boolean transfer = true;
public synchronized void send(String packet) {
while (!transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
transfer = false;
this.packet = packet;
notifyAll();
}
public synchronized String receive() {
while (transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
transfer = true;
notifyAll();
return packet;
}
}
让我们来一条条分析上面的代码:
-
packet
变量表示我们要传输的数据包 transfer
是boolean类型, 用于Sender
和Receiver
同步:- 为
true
时,Receiver
等待Sender
发送消息 - 为
false
时,Sender
应该等待Receiver
接受消息
- 为
Sender
调用send()
向Receiver
发送数据- 如果
transfer
为false
,我们调用wait()
让当前线程进入等待状态 - 当其为
true
时,我们进行状态切换,设置packet
并调用notifyAll()
以通知并唤醒其他线程有事件发生,检查是否可以继续执行。
- 如果
- 类似地,
Receiver
使用receive()
方法:- 如果
transfer
被Sender
设置为false
,只有它会继续执行,否则调用wait()
- 当条件满足,我们切换状态,唤醒所有等待的线程并返回
packet
。
- 如果
5.1. 为什么要把 wait() 方法放入while语句中 ?
由于 notify()
和 notifyAll()
会随机唤醒等待此对象监视器的线程,可能会出现线程被唤醒,但条件实际上尚未满足的情况。
我们可以通过再次判断来避免虚假唤醒。
5.2. 为什么 send() 和 receive() 为同步方法?
设为同步方法目的是取得内在锁(intrinsic lock)。如果一个线程调用了 wait() 但没有获得内在锁,会抛出IllegalMonitorStateException
异常。这个是Java强制规定的,详细原因分析,推荐阅读Stackoverflow上的 这篇问答 - Why must wait() always be in synchronized block。
现在我创建 Sender
和 Receiver
类来收发消息,为了被线程执行,实现 Runnable
接口。
Sender
实现:
public class Sender implements Runnable {
private Data data;
// standard constructors
public void run() {
String packets[] = {
"First packet",
"Second packet",
"Third packet",
"Fourth packet",
"End"
};
for (String packet : packets) {
data.send(packet);
// Thread.sleep() to mimic heavy server-side processing
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}
解析:
- 定义一个 packets 数组
- 发送每个packet
- 调用
Thread.sleep()
随机休眠一段时间,模拟服务端耗时处理。
Receiver
的实现:
public class Receiver implements Runnable {
private Data load;
// standard constructors
public void run() {
for(String receivedMessage = load.receive();
!"End".equals(receivedMessage);
receivedMessage = load.receive()) {
System.out.println(receivedMessage);
// ...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}
调用 load.receive() 循环接收数据,直至接收到最后一个 “End” 数据包。
最后是我们main方法:
public static void main(String[] args) {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));
sender.start();
receiver.start();
}
接收并输出下面的数据:
First packet
Second packet
Third packet
Fourth packet
我们按正确地顺序接收到了所有数据包,并成功地在发送方和接收方之间建立了正确的通信。
6. 总结
本文我们讨论了Java线程同步核心概念。更具体说,我们学习如何使用 wait()
和 notify()
解决同步问题。最后通过例子进一步加深理解。
在结束本文之前,值得一提的是,所有这些底层API,如 wait()
、notify()
和notifyAll()
,都是工作良好的传统方法,但高级如 Java.util.concurrent.locks 包中的 Lock 和 Condition 通常更简单、更好用。
关于 java.util.concurrent
包的更多信息, 请访问 java.util.concurrent 概述 这篇文章,关于 Lock
和 Condition
讲解在 这儿。
惯例,本例示例完整代码托管在 GitHub。