1. Overview

In this tutorial, we’re going to take a quick look at Big Queue, a Java implementation of a persistent queue.

We’ll talk a bit about its architecture, and then we’ll learn how to use it through quick and practical examples.

2. Usage

We’ll need to add the bigqueue dependency to our project:

<dependency>
    <groupId>com.leansoft</groupId>
    <artifactId>bigqueue</artifactId>
    <version>0.7.0</version>
</dependency>

We also need to add its repository:

<repository>
    <id>github.release.repo</id>
    <url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>

If we’re used to working with basic queues, it’ll be a breeze to adapt to Big Queue as its API is quite similar.

2.1. Initialization

We can initialize our queue by simpling calling its constructor:

@Before
public void setup() {
    String queueDir = System.getProperty("user.home");
    String queueName = "baeldung-queue";
    bigQueue = new BigQueueImpl(queueDir, queueName);
}

The first argument is the home directory for our queue.

The second argument represents our queue’s name. It’ll create a folder inside our queue’s home directory where we can persist data.

We should remember to close our queue when we’re done to prevent memory leaks:

bigQueue.close();

2.2. Inserting

We can add elements to the tail by simply calling the enqueue method:

@Test
public void whenAddingRecords_ThenTheSizeIsCorrect() {
    for (int i = 1; i <= 100; i++) {
        bigQueue.enqueue(String.valueOf(i).getBytes());
    }
 
    assertEquals(100, bigQueue.size());
}

We should note that Big Queue only supports the byte[] data type, so we are responsible for serializing our records when inserting.

2.3. Reading

As we might’ve expected, reading data is just as easy using the dequeue method:

@Test
public void whenAddingRecords_ThenTheyCanBeRetrieved() {
    bigQueue.enqueue(String.valueOf("new_record").getBytes());

    String record = new String(bigQueue.dequeue());
 
    assertEquals("new_record", record);
}

We also have to be careful to properly deserialize our data when reading.

Reading from an empty queue throws a NullPointerException.

We should verify that there are values in our queue using the isEmpty method:

if(!bigQueue.isEmpty()){
    // read
}

To empty our queue without having to go through each record, we can use the removeAll method:

bigQueue.removeAll();

2.4. Peeking

When peeking, we simply read a record without consuming it:

@Test
public void whenPeekingRecords_ThenSizeDoesntChange() {
    for (int i = 1; i <= 100; i++) {
        bigQueue.enqueue(String.valueOf(i).getBytes());
    }
 
    String firstRecord = new String(bigQueue.peek());

    assertEquals("1", firstRecord);
    assertEquals(100, bigQueue.size());
}

2.5. Deleting Consumed Records

When we’re calling the dequeue method, records are removed from our queue, but they remain persisted on disk.

This could potentially fill up our disk with unnecessary data.

Fortunately, we can delete the consumed records using the gc method:

bigQueue.gc();

Just like the garbage collector in Java cleans up unreferenced objects from heap, gc cleans consumed records from our disk.

3. Architecture and Features

What’s interesting about Big Queue is the fact that its codebase is extremely small — just 12 source files occupying about 20KB of disk space.

On a high level, it’s just a persistent queue that excels at handling large amounts of data.

3.1. Handling Large Amounts of Data

The size of the queue is limited only by our total disk space available. Every record inside our queue is persisted on disk, in order to be crash-resistant.

Our bottleneck will be the disk I/O, meaning that an SSD will significantly improve the average throughput over an HDD.

3.2. Accessing Data Extremely Fast

If we take a look at its source code, we’ll notice that the queue is backed by a memory-mapped file. The accessible part of our queue (the head) is kept in RAM, so accessing records will be extremely fast.

Even if our queue would grow extremely large and would occupy terabytes of disk space, we would still be able to read data in O(1) time complexity.

If we need to read lots of messages and speed is a critical concern, we should consider using an SSD over an HDD, as moving data from disk to memory would be much faster.

3.3. Advantages

A great advantage is its ability to grow very large in size. We can scale it to theoretical infinity by just adding more storage, hence its name “Big”.

In a concurrent environment, Big Queue can produce and consume around 166MBps of data on a commodity machine.

If our average message size is 1KB, it can process 166k messages per second.

It can go up to 333k messages per second in a single-threaded environment — pretty impressive!

3.4. Disadvantages

Our messages remain persisted to disk, even after we’ve consumed them, so we have to take care of garbage-collecting data when we no longer need it.

We are also responsible for serializing and deserializing our messages.

4. Conclusion

In this quick tutorial, we learned about Big Queue and how we can use it as a scalable and persistent queue.

As always, the code is available over on Github.


« 上一篇: Java 预览功能
» 下一篇: Java 13 新特性