1. 概述
本文将深入探讨 java.util.concurrent
包中的 DelayQueue
构造。这是一个可用于生产者-消费者场景的阻塞队列。
它有一个非常实用的特性:当消费者尝试从队列中取出元素时,只有当该元素的延迟时间已过期,才能将其取出。
2. 为 DelayQueue 中的元素实现 Delayed 接口
每个要放入 DelayQueue
的元素都必须实现 Delayed
接口。假设我们要创建一个 DelayObject
类,该类的实例将被放入 DelayQueue
。
我们将在构造函数中传入 String
类型的数据和 delayInMilliseconds
(延迟毫秒数):
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
我们定义了一个 startTime
——这是元素应该被消费的时间。接下来,我们需要实现 getDelay()
方法——它应返回与该对象关联的剩余延迟时间(以给定时间单位表示)。
因此,我们需要使用 TimeUnit.convert()
方法来返回正确时间单位的剩余延迟:
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
当消费者尝试从队列中取出元素时,DelayQueue
会调用 getDelay()
来判断该元素是否允许被取出。如果 getDelay()
返回零或负数,意味着该元素可以被取出。
我们还需要实现 compareTo()
方法,因为 DelayQueue
中的元素会根据过期时间排序。最先过期的元素位于队列头部,而过期时间最长的元素位于队列尾部:
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
3. DelayQueue 的消费者和生产者
为了测试我们的 DelayQueue
,我们需要实现生产者和消费者逻辑。生产者类接收队列、要生产的元素数量以及每条消息的延迟毫秒数作为参数。
然后,当调用 run()
方法时,它会将元素放入队列,并在每次放入后休眠500毫秒:
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
// 标准构造函数
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
消费者实现 非常相似,但它还会跟踪已消费的消息数量:
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// 标准构造函数
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4. DelayQueue 使用测试
为了测试 DelayQueue
的行为,我们将创建一个生产者线程和一个消费者线程。
生产者将向队列放入两个延迟为500毫秒的对象。测试断言消费者消费了两条消息:
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
运行此程序,我们可以观察到以下输出:
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
生产者放入对象后,过了一会儿,第一个延迟过期的对象被消费。
第二个元素也发生了同样的情况。
5. 消费者在给定时间内无法消费
假设我们有一个生产者生产一个10秒后过期的元素:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
我们启动测试,但5秒后终止。由于 DelayQueue
的特性,消费者无法从队列中消费消息,因为元素尚未过期:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
注意,消费者的 numberOfConsumedElements
值为零。
6. 生产立即过期的元素
当 Delayed
消息的 getDelay()
方法实现返回负数时,表示该元素已经过期。在这种情况下,消费者会立即消费该元素。
我们可以测试生产一个延迟为负的元素的情况:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
启动测试用例时,消费者会立即消费该元素,因为它已经过期:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7. 总结
本文探讨了 java.util.concurrent
包中的 DelayQueue
构造。
我们实现了一个 Delayed
元素,该元素在队列中被生产和消费。
我们利用 DelayQueue
的实现来消费已过期的元素。
所有这些示例和代码片段的实现都可以在 GitHub 项目 中找到——这是一个 Maven 项目,因此应该很容易导入并直接运行。