1. 概述

本文我们将研究 Java 中最基础核心的——线程同步问题。首先我们将讲解一些理论基础知识, 然后通过开发一个涉及并发问题的简单应用,来更好地理解 wait()notify()

2. Java 线程同步

在多线程环境下,多个线程可能尝试修改同一个资源。如果线程管理不当,会引发一致性问题。

2.1. 受保护的代码块(Guarded Block)

一种可以实现多线程协同工作的办法是使用——Guarded Block。此代码块会循环检查一个条件,直到条件满足才恢复执行。

public synchronized void guardedJoy() {
    // This guard only loops once for each special event, which may not
    // be the event we're waiting for.
    while(!joy) { // 检查是否满足执行条件
        try {
            wait(); // 不满足,等待被notify()唤醒
        } catch (InterruptedException e) {}
    }
    // 继续执行
}

基于这样的理念,我们可以利用:

从下图可以更好地理解这一点。此图描绘了一个线程的生命周期:

注意到有多种方式可以控制一个线程的生命周期。而本文,我们主要关注 wait()notify()

3. wait() 方法

简单来说,当我们调用一个对象的 wait() 方法时,会强制当前线程进入等待状态,直到另一个线程调用此对象的 notify()notifyAll() 方法。

当前线程必须是此对象的监视器(moniter)拥有者。那么成为拥有者呢,根据Javadocs文档说明:

  • we've executed synchronized instance method for the given object(执行对象的同步实例方法)
  • we've executed the body of a synchronized block on the given object (在此对象上执行同步代码块)
  • by executing synchronized static methods for objects of type Class (对于 Class 类型的对象,执行同步静态方法)

请注意,同一时间只能有一个活动线程可以拥一个对象的监视器

wait()有3个重载方法:

3.1. 不带参的 wait()

此方法会使当前线程无限等待,直到另一个线程调用此对象的 notify()notifyAll() 方法。

3.2. wait(long timeout)

该方法可以指定一个等待超时时间,超时后可以自动唤醒。

wait(0) 等同于 wait()。

3.3. wait(long timeout, int nanos)

此方法功能和上面一样,不同的是可以通过第二个参数设置更高的时间精度(纳秒)。

所以总时间还要加上第二个参数,计算方式为(以纳秒为单位):1_000_000*timeout + nanos。

4. notify() 和 notifyAll()

notify() 方法用于唤醒一个正在等待此对象监视器的线程。

这里有2种方式可以唤醒一个等待线程。

4.1. notify()

对于所有正在等待此对象监控器的线程(即调用 wait() 方法的线程),notify() 只会随机唤醒一个线程。具体选择那个线程,不能确定,由底层实现决定。

由于 notify() 只会随机唤醒一个线程,因此在多个线程执行类似任务情况下可以用它来实现互斥锁,但大多数情况下,更建议使用 notifyAll()

4.2. notifyAll()

该方法会唤醒所有等待此对象监控器的线程。

唤醒后的线程以正常的方式继续完成工作,就像其他线程一样。

但允许线程继续往下执行之前,我们一定要先检查是否满足了允许执行的条件,即while语句条件是否满足。因为可能出现线程被唤醒,但并没有收到通知的情况(这个问题我们稍后讨论)

5. 生产 - 消费者同步问题

在理解基本概念之后,我们来看一个 Sender – Receiver 例子,例子中使用 wait()notfiy() 实现两者同步:

  • SenderReceiver 发送数据包
  • Sender 未发送完毕之前,Receiver 不能进行处理
  • 同理,Receiver 未处理完上一条数据包之前,Sender 不能发送下一条的数据包

首先我们创建一个 Data 类,packet 为我们要发送和处理的数据。我们将使用 wait()notifyAll() 来实现同步操作:

    public class Data {
        private String packet;
        
        // True if receiver should wait
        // False if sender should wait
        private boolean transfer = true;
     
        public synchronized void send(String packet) {
            while (!transfer) {
                try { 
                    wait();
                } catch (InterruptedException e)  {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
            transfer = false;
            
            this.packet = packet;
            notifyAll();
        }
     
        public synchronized String receive() {
            while (transfer) {
                try {
                    wait();
                } catch (InterruptedException e)  {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
            transfer = true;
    
            notifyAll();
            return packet;
        }
    }

让我们来一条条分析上面的代码:

  • packet 变量表示我们要传输的数据包
  • transfer 是boolean类型, 用于 SenderReceiver 同步:
    • true 时,Receiver 等待 Sender 发送消息
    • false 时, Sender 应该等待 Receiver 接受消息
  • Sender 调用 send()Receiver 发送数据
    • 如果 transferfalse,我们调用 wait() 让当前线程进入等待状态
    • 当其为 true时,我们进行状态切换,设置 packet 并调用 notifyAll() 以通知并唤醒其他线程有事件发生,检查是否可以继续执行。
  • 类似地,Receiver 使用 receive() 方法:
    • 如果 transferSender 设置为 false,只有它会继续执行,否则调用 wait()
    • 当条件满足,我们切换状态,唤醒所有等待的线程并返回 packet

5.1. 为什么要把 wait() 方法放入while语句中 ?

由于 notify()notifyAll() 会随机唤醒等待此对象监视器的线程,可能会出现线程被唤醒,但条件实际上尚未满足的情况。

我们可以通过再次判断来避免虚假唤醒。

5.2. 为什么 send() 和 receive() 为同步方法?

设为同步方法目的是取得内在锁(intrinsic lock)。如果一个线程调用了 wait() 但没有获得内在锁,会抛出IllegalMonitorStateException 异常。这个是Java强制规定的,详细原因分析,推荐阅读Stackoverflow上的 这篇问答 - Why must wait() always be in synchronized block

现在我创建 SenderReceiver 类来收发消息,为了被线程执行,实现 Runnable 接口。

Sender 实现:

    public class Sender implements Runnable {
        private Data data;
     
        // standard constructors
     
        public void run() {
            String packets[] = {
              "First packet",
              "Second packet",
              "Third packet",
              "Fourth packet",
              "End"
            };
     
            for (String packet : packets) {
                data.send(packet);
    
                // Thread.sleep() to mimic heavy server-side processing
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
                } catch (InterruptedException e)  {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
        }
    }

解析:

  • 定义一个 packets 数组
  • 发送每个packet
  • 调用Thread.sleep() 随机休眠一段时间,模拟服务端耗时处理。

Receiver 的实现:

    public class Receiver implements Runnable {
        private Data load;
     
        // standard constructors
     
        public void run() {
            for(String receivedMessage = load.receive();
              !"End".equals(receivedMessage);
              receivedMessage = load.receive()) {
                
                System.out.println(receivedMessage);
    
                // ...
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); 
                    Log.error("Thread interrupted", e); 
                }
            }
        }
    }

调用 load.receive() 循环接收数据,直至接收到最后一个 “End” 数据包。

最后是我们main方法:

    public static void main(String[] args) {
        Data data = new Data();
        Thread sender = new Thread(new Sender(data));
        Thread receiver = new Thread(new Receiver(data));
        
        sender.start();
        receiver.start();
    }

接收并输出下面的数据:

First packet
Second packet
Third packet
Fourth packet

我们按正确地顺序接收到了所有数据包,并成功地在发送方和接收方之间建立了正确的通信。

6. 总结

本文我们讨论了Java线程同步核心概念。更具体说,我们学习如何使用 wait()notify() 解决同步问题。最后通过例子进一步加深理解。

在结束本文之前,值得一提的是,所有这些底层API,如 wait()notify()notifyAll(),都是工作良好的传统方法,但高级如 Java.util.concurrent.locks 包中的 Lock 和 Condition 通常更简单、更好用。

关于 java.util.concurrent 包的更多信息, 请访问 java.util.concurrent 概述 这篇文章,关于 LockCondition 讲解在 这儿

惯例,本例示例完整代码托管在 GitHub