1. Overview

In this article, we will be looking at the Phaser construct from the java.util.concurrent package. It is a very similar construct to the CountDownLatch that allows us to coordinate the execution of threads. In comparison to the CountDownLatch, it has some additional functionality.

The Phaser is a barrier on which the dynamic number of threads need to wait before continuing execution. In the CountDownLatch, that number cannot be configured dynamically and must be supplied when creating the instance.

2. Phaser API

The Phaser allows us to build logic in which threads need to wait on the barrier before going to the next step of execution.

We can coordinate multiple phases of execution, reusing a Phaser instance for each program phase. Each phase can have a different number of threads waiting to advance to another phase. We’ll have a look at an example of using phases later on.

To participate in the coordination, the thread needs to register() itself with the Phaser instance. Note that this only increases the number of registered parties, and we can’t check whether the current thread is registered – we’d have to subclass the implementation to support this.

The thread signals that it arrived at the barrier by calling the arriveAndAwaitAdvance(), which is a blocking method. When the number of arrived parties is equal to the number of registered parties, the execution of the program will continue, and the phase number will increase. We can get the current phase number by calling the getPhase() method.

When the thread finishes its job, we should call the arriveAndDeregister() method to signal that the current thread should no longer be accounted for in this particular phase.

3. Implementing Logic Using Phaser API

Let’s say that we want to coordinate multiple phases of action. Three threads will process the first phase, and two threads will process the second phase.

We’ll create a LongRunningAction class that implements the Runnable interface:

class LongRunningAction implements Runnable {
    private String threadName;
    private Phaser ph;

    LongRunningAction(String threadName, Phaser ph) {
        this.threadName = threadName;
        this.ph = ph;
        this.randomWait();
        ph.register();
    }

    @Override
    public void run() {
        ph.arriveAndAwaitAdvance();
        randomWait();
        ph.arriveAndDeregister();
    }

    // Simulating real work
    private void randomWait() {
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

When our action class is instantiated, we register to the Phaser instance using the register() method. This will increment the number of threads using that specific Phaser.

The call to the arriveAndAwaitAdvance() will cause the current thread to wait on the barrier. As already mentioned, when the number of arrived parties becomes the same as the number of registered parties, the execution will continue.

After the processing is done, the current thread is deregistering itself by calling the arriveAndDeregister() method.

Note: We are introducing random delays in the thread using the randomWait method to replicate real-time scenarios.

4. Testing Phaser API

Let’s create a test case in which we will start three LongRunningAction threads and block onto the barrier. Next, after the action is finished, we will create two additional LongRunningAction threads that will perform the processing of the next phase.

When creating the Phaser instance from the main thread, we’re passing 1 as an argument. This is equivalent to calling the register() method from the current thread.

We’re doing this because, when we’re creating three worker threads, the main thread is a coordinator, and therefore the Phaser needs to have four threads registered to it:

Phaser ph = new Phaser(1);
assertEquals(0, ph.getPhase());

The phase after the initialization is equal to zero.

The Phaser class has a constructor to which we can pass a parent instance to it. It is useful in cases where we have large numbers of parties that would experience massive synchronization contention costs. In such situations, instances of Phasers may be set up so that groups of sub-phasers share a common parent.

Next, let’s start three LongRunningAction action threads, which will be waiting on the barrier until we will call the arriveAndAwaitAdvance() method from the main thread.

Keep in mind we’ve initialized our Phaser with 1 and called register() three more times.

new Thread(new LongRunningAction("thread-1", ph)).start();
new Thread(new LongRunningAction("thread-2", ph)).start();
new Thread(new LongRunningAction("thread-3", ph)).start();

Now, three action threads have announced that they’ve arrived at the barrier, so one more call of arriveAndAwaitAdvance() is needed – the one from the main thread:

ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase());

After the completion of that phase, the getPhase() method will return one because the program finished processing the first step of execution.

Let’s say that two threads should conduct the next phase of processing. We can leverage Phaser to achieve that because it allows us to configure dynamically the number of threads that should wait on the barrier. We’re starting two new threads, but these will not proceed to execute until the call to the arriveAndAwaitAdvance() from the main thread (same as in the previous case):

new Thread(new LongRunningAction("thread-4", ph)).start();
new Thread(new LongRunningAction("thread-5", ph)).start();
ph.arriveAndAwaitAdvance();
 
assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

After this, the getPhase() method will return a phase number equal to two. When we want to finish our program, we need to call the arriveAndDeregister() method as the main thread is still registered in the Phaser. When the deregistration causes the number of registered parties to become zero, the Phaser is terminated. All calls to synchronization methods will not be blocked anymore and will return immediately.

Running the program will produce the following output (full source code with the print line statements can be found in the code repository):

Thread thread-1 registered during phase 0
Thread thread-1 BEFORE long running action in phase 0
Thread thread-2 registered during phase 0
Thread thread-2 BEFORE long running action in phase 0
Thread thread-3 registered during phase 0
Thread main waiting for others
Thread thread-3 BEFORE long running action in phase 0
Thread main proceeding in phase 1
Thread thread-4 registered during phase 1
Thread thread-3 AFTER long running action in phase 1
Thread thread-2 AFTER long running action in phase 1
Thread thread-1 AFTER long running action in phase 1
Thread thread-5 registered during phase 1
Thread main waiting for new phase
Thread thread-4 BEFORE long running action in phase 1
Thread thread-5 BEFORE long running action in phase 1
Thread main proceeding in phase 2
Thread thread-5 AFTER long running action in phase 2
Thread thread-4 AFTER long running action in phase 2

We see that all threads are waiting for execution until the barrier opens. The next phase of the execution is performed only when the previous one is finished successfully.

4. Conclusion

In this tutorial, we had a look at the Phaser construct from java.util.concurrent and we implemented the coordination logic with multiple phases using the Phaser class.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.


« 上一篇: Java周报,176