1. Overview
In this tutorial, we’ll look at the concept of work stealing in Java.
2. What Is Work Stealing?
Work stealing was introduced in Java with the aim of reducing contention in multi-threaded applications. This is done using the fork/join framework.
2.1. Divide and Conquer Approach
In the fork/join framework, problems or tasks are recursively broken down into sub-tasks. The sub-tasks are then solved individually, with the sub-results combined to form the result:
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
2.2. Worker Threads
The broken-down task is solved with the help of worker threads provided by a thread pool. Each worker thread will have sub-tasks it’s responsible for. These are stored in double-ended queues (deques).
Each worker thread gets sub-tasks from its deque by continuously popping a sub-task off the top of the deque. When a worker thread’s deque is empty, it means that all the sub-tasks have been popped off and completed.
At this point, the worker thread randomly selects a peer thread-pool thread it can “steal” work from. It then uses the first-in, first-out approach (FIFO) to take sub-tasks from the tail end of the victim’s deque.
3. Fork/Join Framework Implementation
We can create a work-stealing thread pool using either the ForkJoinPool class or the Executors class:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
ExecutorService workStealingPool = Executors.newWorkStealingPool();
The Executors class has an overloaded newWorkStealingPool method, which takes an integer argument representing the level of parallelism.
Executors.newWorkStealingPool is an abstraction of ForkJoinPool.commonPool. The only difference is that Executors.newWorkStealingPool creates a pool in asynchronous mode and ForkJoinPool.commonPool doesn’t.
4. Synchronous vs Asynchronous Thread Pools
ForkJoinPool.commonPool uses a last-in, first-out (LIFO) queue configuration, whereas Executors.newWorkStealingPool uses first-in, first-out approach (FIFO) one.
According to Doug Lea, the FIFO approach has these advantages over LIFO:
- It reduces contention by having stealers operate on the opposite side of the deque as owners
- It exploits the property of recursive divide−and−conquer algorithms of generating “large” tasks early
The second point above means that it is possible to further break down an older stolen task by a thread that stole it.
As per the Java documentation, setting asyncMode to true may be suitable for use with event-style tasks that are never joined.
5. Working Example – Finding Prime Numbers
We’ll use the example of finding prime numbers from a collection of numbers to show the computation time benefits of the work-stealing framework. We’ll also show the differences between using synchronous and asynchronous thread pools.
5.1. The Prime Numbers Problem
Finding prime numbers from a collection of numbers can be a computationally expensive process. This is mainly due to the size of the collection of numbers.
The PrimeNumbers class helps us find prime numbers:
public class PrimeNumbers extends RecursiveAction {
private int lowerBound;
private int upperBound;
private int granularity;
static final List<Integer> GRANULARITIES
= Arrays.asList(1, 10, 100, 1000, 10000);
private AtomicInteger noOfPrimeNumbers;
PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.granularity = granularity;
this.noOfPrimeNumbers = noOfPrimeNumbers;
}
// other constructors and methods
private List<PrimeNumbers> subTasks() {
List<PrimeNumbers> subTasks = new ArrayList<>();
for (int i = 1; i <= this.upperBound / granularity; i++) {
int upper = i * granularity;
int lower = (upper - granularity) + 1;
subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers));
}
return subTasks;
}
@Override
protected void compute() {
if (((upperBound + 1) - lowerBound) > granularity) {
ForkJoinTask.invokeAll(subTasks());
} else {
findPrimeNumbers();
}
}
void findPrimeNumbers() {
for (int num = lowerBound; num <= upperBound; num++) {
if (isPrime(num)) {
noOfPrimeNumbers.getAndIncrement();
}
}
}
public int noOfPrimeNumbers() {
return noOfPrimeNumbers.intValue();
}
}
A few important things to note about this class:
- It extends RecursiveAction, which allows us to implement the compute method used in computing tasks using a thread pool
- It recursively breaks down tasks into sub-tasks based on the granularity value
- The constructors take lower and upper bound values which control the range of numbers we want to determine prime numbers for
- It enables us to determine prime numbers using either a work-stealing thread pool or a single thread
5.2. Solving the Problem Faster with Thread Pools
Let’s determine prime numbers in a single-threaded manner and also using work-stealing thread pools.
First, let’s see the single-threaded approach:
PrimeNumbers primes = new PrimeNumbers(10000);
primes.findPrimeNumbers();
And now, the ForkJoinPool.commonPool approach:
PrimeNumbers primes = new PrimeNumbers(10000);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();
Finally, we’ll have a look at the Executors.newWorkStealingPool approach:
PrimeNumbers primes = new PrimeNumbers(10000);
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();
We use the invoke method of the ForkJoinPool class to pass tasks to the thread pool. This method takes in instances of sub-classes of RecursiveAction. Using Java Microbench Harness, we benchmark these different approaches against each other in terms of the average time per operation:
# Run complete. Total time: 00:04:50
Benchmark Mode Cnt Score Error Units
PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark avgt 20 119.885 ± 9.917 ms/op
PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark avgt 20 119.791 ± 7.811 ms/op
PrimeNumbersUnitTest.Benchmarker.singleThread avgt 20 475.964 ± 7.929 ms/op
It is clear that both ForkJoinPool.commonPool and Executors.newWorkStealingPool allow us to determine prime numbers faster than with a single-threaded approach.
The fork/join pool framework lets us break down the task into sub-tasks. We broke down the collection of 10,000 integers into batches of 1-100, 101-200, 201-300 and so on. We then determined prime numbers for each batch and made the total number of prime numbers available with our noOfPrimeNumbers method.
5.3. Stealing Work to Compute
With a synchronous thread pool, ForkJoinPool.commonPool puts threads in the pool as long as the task is still in progress. As a result, the level of work stealing is not dependent on the level of task granularity.
The asynchronous Executors.newWorkStealingPool is more managed, allowing the level of work stealing to be dependent on the level of task granularity.
We get the level of work stealing using the getStealCount of the ForkJoinPool class:
long steals = forkJoinPool.getStealCount();
Determining the work-stealing count for Executors.newWorkStealingPool and ForkJoinPool.commonPool gives us dissimilar behavior:
Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]
ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]
When granularity changes from fine to coarse (1 to 10,000) for Executors.newWorkStealingPool, the level of work stealing decreases. Therefore, the steal count is one when the task is not broken down (granularity of 10,000).
The ForkJoinPool.commonPool has a different behavior. The level of work stealing is always high and not influenced much by the change in task granularity.
Technically speaking, our prime numbers example is one that supports asynchronous processing of event-style tasks. This is because our implementation does not enforce the joining of results.
A case can be made that Executors.newWorkStealingPool offers the best use of resources in solving the problem.
6. Conclusion
In this article, we looked at work stealing and how to apply it using the fork/join framework. We also looked at the examples of work stealing and how it can improve processing time and use of resources.
As always, the full source code of the example is available over on GitHub.