1. 概述

本文将深入探讨 java.util.concurrent 包中的 DelayQueue 构造。这是一个可用于生产者-消费者场景的阻塞队列。

它有一个非常实用的特性:当消费者尝试从队列中取出元素时,只有当该元素的延迟时间已过期,才能将其取出。

2. 为 DelayQueue 中的元素实现 Delayed 接口

每个要放入 DelayQueue 的元素都必须实现 Delayed 接口。假设我们要创建一个 DelayObject 类,该类的实例将被放入 DelayQueue

我们将在构造函数中传入 String 类型的数据和 delayInMilliseconds(延迟毫秒数):

public class DelayObject implements Delayed {
    private String data;
    private long startTime;

    public DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }

我们定义了一个 startTime——这是元素应该被消费的时间。接下来,我们需要实现 getDelay() 方法——它应返回与该对象关联的剩余延迟时间(以给定时间单位表示)。

因此,我们需要使用 TimeUnit.convert() 方法来返回正确时间单位的剩余延迟:

@Override
public long getDelay(TimeUnit unit) {
    long diff = startTime - System.currentTimeMillis();
    return unit.convert(diff, TimeUnit.MILLISECONDS);
}

当消费者尝试从队列中取出元素时,DelayQueue 会调用 getDelay() 来判断该元素是否允许被取出。如果 getDelay() 返回零或负数,意味着该元素可以被取出。

我们还需要实现 compareTo() 方法,因为 DelayQueue 中的元素会根据过期时间排序。最先过期的元素位于队列头部,而过期时间最长的元素位于队列尾部:

@Override
public int compareTo(Delayed o) {
    return Ints.saturatedCast(
      this.startTime - ((DelayObject) o).startTime);
}

3. DelayQueue 的消费者和生产者

为了测试我们的 DelayQueue,我们需要实现生产者和消费者逻辑。生产者类接收队列、要生产的元素数量以及每条消息的延迟毫秒数作为参数。

然后,当调用 run() 方法时,它会将元素放入队列,并在每次放入后休眠500毫秒:

public class DelayQueueProducer implements Runnable {
 
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToProduce;
    private Integer delayOfEachProducedMessageMilliseconds;

    // 标准构造函数

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToProduce; i++) {
            DelayObject object
              = new DelayObject(
                UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object: " + object);
            try {
                queue.put(object);
                Thread.sleep(500);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

消费者实现 非常相似,但它还会跟踪已消费的消息数量:

public class DelayQueueConsumer implements Runnable {
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToTake;
    public AtomicInteger numberOfConsumedElements = new AtomicInteger();

    // 标准构造函数

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) {
            try {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

4. DelayQueue 使用测试

为了测试 DelayQueue 的行为,我们将创建一个生产者线程和一个消费者线程。

生产者将向队列放入两个延迟为500毫秒的对象。测试断言消费者消费了两条消息:

@Test
public void givenDelayQueue_whenProduceElement
  _thenShouldConsumeAfterGivenDelay() throws InterruptedException {
    // given
    ExecutorService executor = Executors.newFixedThreadPool(2);
    
    BlockingQueue<DelayObject> queue = new DelayQueue<>();
    int numberOfElementsToProduce = 2;
    int delayOfEachProducedMessageMilliseconds = 500;
    DelayQueueConsumer consumer = new DelayQueueConsumer(
      queue, numberOfElementsToProduce);
    DelayQueueProducer producer = new DelayQueueProducer(
      queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

    // when
    executor.submit(producer);
    executor.submit(consumer);

    // then
    executor.awaitTermination(5, TimeUnit.SECONDS);
    executor.shutdown();
 
    assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}

运行此程序,我们可以观察到以下输出:

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

生产者放入对象后,过了一会儿,第一个延迟过期的对象被消费。

第二个元素也发生了同样的情况。

5. 消费者在给定时间内无法消费

假设我们有一个生产者生产一个10秒后过期的元素:

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
  queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

我们启动测试,但5秒后终止。由于 DelayQueue 的特性,消费者无法从队列中消费消息,因为元素尚未过期:

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);

注意,消费者的 numberOfConsumedElements 值为零。

6. 生产立即过期的元素

Delayed 消息的 getDelay() 方法实现返回负数时,表示该元素已经过期。在这种情况下,消费者会立即消费该元素。

我们可以测试生产一个延迟为负的元素的情况:

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

启动测试用例时,消费者会立即消费该元素,因为它已经过期:

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);

7. 总结

本文探讨了 java.util.concurrent 包中的 DelayQueue 构造。

我们实现了一个 Delayed 元素,该元素在队列中被生产和消费。

我们利用 DelayQueue 的实现来消费已过期的元素。

所有这些示例和代码片段的实现都可以在 GitHub 项目 中找到——这是一个 Maven 项目,因此应该很容易导入并直接运行。


原始标题:Guide to DelayQueue