1. Overview
In this article, we’ll be looking at the DelayQueue construct from the java.util.concurrent package. This is a blocking queue that could be used in producer-consumer programs.
It has a very useful characteristic – when the consumer wants to take an element from the queue, they can take it only when the delay for that particular element has expired.
2. Implementing Delayed for Elements in the DelayQueue
Each element we want to put into the DelayQueue needs to implement the Delayed interface. Let’s say that we want to create a DelayObject class. Instances of that class will be put into the DelayQueue.
We’ll pass the String data and delayInMilliseconds as and arguments to its constructor:
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
We are defining a startTime – this is a time when the element should be consumed from the queue. Next, we need to implement the getDelay() method – it should return the remaining delay associated with this object in the given time unit.
Therefore, we need to use the TimeUnit.convert() method to return the remaining delay in the proper TimeUnit:
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
When the consumer tries to take an element from the queue, the DelayQueue will execute getDelay() to find out if that element is allowed to be returned from the queue. If the getDelay() method will return zero or a negative number, it means that it could be retrieved from the queue.
We also need to implement the compareTo() method, because the elements in the DelayQueue will be sorted according to the expiration time. The item that will expire first is kept at the head of the queue and the element with the highest expiration time is kept at the tail of the queue:
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
*3. DelayQueue Consumer and Producer*
To be able to test our DelayQueue we need to implement producer and consumer logic. The producer class takes the queue, the number of elements to produce, and the delay of each message in milliseconds as arguments.
Then when the run() method is invoked, it puts elements into the queue, and sleeps for 500 milliseconds after each put:
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
// standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
The consumer implementation is very similar, but it also keeps track of the number of messages that were consumed:
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4. DelayQueue Usage Test
To test the behavior of the DelayQueue, we’ll create one producer thread and one consumer thread.
The producer will put() two objects onto the queue with 500 milliseconds delay. The test asserts that the consumer consumed two messages:
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
We can observe that running this program will produce the following output:
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
The producer puts the object, and after awhile the first object for which the delay expired is consumed.
The same situation occurred for the second element.
5. Consumer Not Able to Consume in the Given Time
Let’s say that we have a producer that is producing an element that will expire in 10 seconds:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
We’ll start our test, but it will terminate after 5 seconds. Due to the characteristics of the DelayQueue, the consumer will not be able to consume the message from the queue because the element hasn’t expired yet:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
Note, that the consumer’s numberOfConsumedElements has a value equal to zero.
6. Producing an Element With Immediate Expiration
When the implementations of the Delayed message getDelay() method return a negative number, that means the given element has already expired. In this situation, the producer will consume that element immediately.
We can test the situation of producing an element with negative delay:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
When we start the test case, the consumer will consume the element immediately because it has already expired:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7. Conclusion
In this article, we were looking at the DelayQueue construct from the java.util.concurrent package.
We implemented a Delayed element that was produced and consumed from the queue.
We leveraged our implementation of the DelayQueue to consume elements that had expired.
The implementation of all these examples and code snippets can be found in the GitHub project – which is a Maven project, so it should be easy to import and run as it is.