1. Overview
In this quick article, we’ll be looking at the ConcurrentSkipListMap class from the java.util.concurrent package.
This construct allows us to create thread-safe logic in a lock-free way. It’s ideal for problems when we want to make an immutable snapshot of the data while other threads are still inserting data into the map.
We will be solving a problem of sorting a stream of events and getting a snapshot of the events that arrived in the last 60 seconds using that construct.
2. Stream Sorting Logic
Let’s say that we have a stream of events that are continually coming from multiple threads. We need to be able to take events from the last 60 seconds, and also events that are older than 60 seconds.
First, let’s define the structure of our event data:
public class Event {
private ZonedDateTime eventTime;
private String content;
// standard constructors/getters
}
We want to keep our events sorted using the eventTime field. To achieve this using the ConcurrentSkipListMap, we need to pass a Comparator to its constructor while creating an instance of it:
ConcurrentSkipListMap<ZonedDateTime, String> events
= new ConcurrentSkipListMap<>(
Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));
We’ll be comparing all arrived events using their timestamps. We are using the comparingLong() method and passing the extract function that can take a long timestamp from the ZonedDateTime.
When our events are arriving, we need only to add them to the map using the put() method. Note that this method does not require any explicit synchronization:
public void acceptEvent(Event event) {
events.put(event.getEventTime(), event.getContent());
}
The ConcurrentSkipListMap will handle the sorting of those events underneath using the Comparator that was passed to it in the constructor.
The most notable pros of the ConcurrentSkipListMap are the methods that can make an immutable snapshot of its data in a lock-free way. To get all events that arrived within the past minute, we can use the tailMap() method and pass the time from which we want to get elements:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}
It will return all events from the past minute. It will be an immutable snapshot and what is the most important is that other writing threads can add new events to the ConcurrentSkipListMap without any need to do explicit locking.
We can now get all events that arrived later that one minute from now – by using the headMap() method:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
return events.headMap(ZonedDateTime.now().minusMinutes(1));
}
This will return an immutable snapshot of all events that are older than one minute. All of the above methods belong to the EventWindowSort class, which we’ll use in the next section.
3. Testing the Sorting Stream Logic
Once we implemented our sorting logic using the ConcurrentSkipListMap, we can now test it by creating two writer threads that will send one hundred events each:
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;
Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(
new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}
Each thread is invoking the acceptEvent() method, sending the events that have eventTime from now to “now minus one hundred seconds”.
In the meantime, we can invoke the getEventsFromLastMinute() method that will return the snapshot of events that are within the one minute window:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsFromLastMinute();
The number of events in the eventsFromLastMinute will be varying in each test run depending on the speed at which the producer threads will be sending the events to the EventWindowSort. We can assert that there is not a single event in the returned snapshot that is older than one minute:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventsOlderThanOneMinute, 0);
And that there are more than zero events in the snapshot that are within the one minute window:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventYoungerThanOneMinute > 0);
Our getEventsFromLastMinute() uses the tailMap() underneath.
Let’s test now the getEventsOlderThatOneMinute() that is using the headMap() method from the ConcurrentSkipListMap:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsOlderThatOneMinute();
This time we get a snapshot of events that are older than one minute. We can assert that there are more than zero of such events:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventsOlderThanOneMinute > 0);
And next, that there is not a single event that is from within the last minute:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventYoungerThanOneMinute, 0);
The most important thing to note is that we can take the snapshot of data while other threads are still adding new values to the ConcurrentSkipListMap.
4. Conclusion
In this quick tutorial, we had a look at the basics of the ConcurrentSkipListMap, along with some practical examples*.*
We leveraged the high performance of the ConcurrentSkipListMap to implement a non-blocking algorithm that can serve us an immutable snapshot of data even if at the same time multiple threads are updating the map.
The implementation of all these examples and code snippets can be found in the GitHub project; this is a Maven project, so it should be easy to import and run as it is.