1. Introduction

In a multi-threaded environment, sometimes we need to schedule tasks based on custom criteria instead of just the creation time.

Let’s see how we can achieve this in Java – using a PriorityBlockingQueue.

2. Overview

Let us say we have jobs that we want to execute based on their priority:

public class Job implements Runnable {
    private String jobName;
    private JobPriority jobPriority;
    
    @Override
    public void run() {
        System.out.println("Job:" + jobName +
          " Priority:" + jobPriority);
        Thread.sleep(1000); // to simulate actual execution time
    }

    // standard setters and getters
}

For demonstration purposes, we’re printing the job name and priority in the run() method.

We also added sleep() so that we simulate a longer-running job; while the job is executing, more jobs will get accumulated in the priority queue.

Finally, JobPriority is a simple enum:

public enum JobPriority {
    HIGH,
    MEDIUM,
    LOW
}

3. Custom Comparator

We need to write a comparator defining our custom criteria; and, in Java 8, it’s trivial:

Comparator.comparing(Job::getJobPriority);

4. Priority Job Scheduler

With all the setup done, let’s now implement a simple job scheduler – which employs a single thread executor to look for jobs in the PriorityBlockingQueue and executes them:

public class PriorityJobScheduler {

    private ExecutorService priorityJobPoolExecutor;
    private ExecutorService priorityJobScheduler 
      = Executors.newSingleThreadExecutor();
    private PriorityBlockingQueue<Job> priorityQueue;

    public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
        priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
        priorityQueue = new PriorityBlockingQueue<Job>(
          queueSize, 
          Comparator.comparing(Job::getJobPriority));
        priorityJobScheduler.execute(() -> {
            while (true) {
                try {
                    priorityJobPoolExecutor.execute(priorityQueue.take());
                } catch (InterruptedException e) {
                    // exception needs special handling
                    break;
                }
            }
        });
    }

    public void scheduleJob(Job job) {
        priorityQueue.add(job);
    }
}

The key here is to create an instance of PriorityBlockingQueue of Job type with a custom comparator. The next job to execute is picked from the queue using take() method which retrieves and removes the head of the queue.

The client code now simply needs to call the scheduleJob() – which adds the job to the queue. The priorityQueue.add() queues the job at appropriate position as compared to existing jobs in the queue, using the JobExecutionComparator.

Note that the actual jobs are executed using a separate ExecutorService with a dedicated thread pool.

5. Demo

Finally, here’s a quick demonstration of the scheduler:

private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;

@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
    Job job1 = new Job("Job1", JobPriority.LOW);
    Job job2 = new Job("Job2", JobPriority.MEDIUM);
    Job job3 = new Job("Job3", JobPriority.HIGH);
    Job job4 = new Job("Job4", JobPriority.MEDIUM);
    Job job5 = new Job("Job5", JobPriority.LOW);
    Job job6 = new Job("Job6", JobPriority.HIGH);
    
    PriorityJobScheduler pjs = new PriorityJobScheduler(
      POOL_SIZE, QUEUE_SIZE);
    
    pjs.scheduleJob(job1);
    pjs.scheduleJob(job2);
    pjs.scheduleJob(job3);
    pjs.scheduleJob(job4);
    pjs.scheduleJob(job5);
    pjs.scheduleJob(job6);

    // clean up
}

In order to demo that the jobs are executed in the order of priority, we’ve kept the POOL_SIZE as 1 even though the QUEUE_SIZE is 10. We provide jobs with varying priority to the scheduler.

Here is a sample output we got for one of the runs:

Job:Job3 Priority:HIGH
Job:Job6 Priority:HIGH
Job:Job4 Priority:MEDIUM
Job:Job2 Priority:MEDIUM
Job:Job1 Priority:LOW
Job:Job5 Priority:LOW

The output could vary across runs. However, we should never have a case where a lower priority job is executed even when the queue contains a higher priority job.

6. Conclusion

In this quick tutorial, we saw how PriorityBlockingQueue can be used to execute jobs in a custom priority order.

As usual, source files can be found over on GitHub.