1. 概述
本文将重点介绍PriorityBlockingQueue
类,并通过一些实际示例进行讲解。我们假设已经了解了Queue
的基本概念,首先我们将展示如何根据优先级对PriorityBlockingQueue
中的元素进行排序。
接下来,我们将演示如何使用这种队列来阻塞线程。最后,我们将展示如何结合这两个特性在多线程处理数据时派上用场。
2. 元素的优先级
与标准队列不同,PriorityBlockingQueue
不能随意添加任何类型的元素。有两种选择:
- 添加实现
Comparable
的元素 - 添加不实现
Comparable
的元素,但需要提供一个Comparator
作为条件
通过使用Comparator
或Comparable
来比较元素,PriorityBlockingQueue
始终保持有序。目标是实现一种比较逻辑,确保始终优先处理优先级最高的元素。当我们从队列中移除元素时,它总是具有最高优先级。
首先,让我们在一个线程中使用队列,而不是跨多个线程。这样便于我们在单元测试中验证元素的排序顺序:
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ArrayList<Integer> polledElements = new ArrayList<>();
queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);
queue.drainTo(polledElements);
assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);
尽管以随机顺序向队列添加元素,但我们开始拉取时它们会按顺序排列。这是因为Integer
类实现了Comparable
,这将确保我们按照升序从队列中取出它们。
值得注意的是,当两个元素比较相等时,它们的排序顺序没有保证。
3. 使用队列阻塞
如果我们处理的是标准队列,我们会调用poll()
方法获取元素。但如果队列为空,poll()
将返回null
。
PriorityBlockingQueue
实现了BlockingQueue
接口,提供了额外的方法,允许我们在从空队列中移除元素时进行阻塞。我们可以尝试使用take()
方法,它应该能达到这个目的:
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
new Thread(() -> {
System.out.println("Polling...");
try {
Integer poll = queue.take();
System.out.println("Polled: " + poll);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");
queue.add(1);
尽管使用sleep()
作为演示方式有些脆弱,但运行这段代码后,我们会看到:
Polling...
Adding to queue
Polled: 1
这证明了take()
在有元素添加之前会阻塞:
- 线程将打印“Polling”,证明它已启动
- 测试暂停大约五秒钟,证明此时线程必须已调用了
take()
- 我们向队列添加元素,几乎立即看到“Polled: 1”,证明一旦有元素可用,
take()
就返回了元素
此外,BlockingQueue
接口还为我们提供了在队列满时阻塞的方法。但是PriorityBlockingQueue
是无界的,这意味着它永远不会满,总是可以添加新元素。
4. 结合阻塞和优先级
现在我们已经解释了PriorityBlockingQueue
的关键概念,让我们一起使用它们。我们可以扩展之前的示例,但这次向队列添加更多元素:
Thread thread = new Thread(() -> {
System.out.println("Polling...");
while (true) {
try {
Integer poll = queue.take();
System.out.println("Polled: " + poll);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");
queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
虽然由于使用sleep()
有些脆弱,但它仍然展示了有效用例。我们有一个会阻塞等待元素添加的队列,然后一次添加大量元素,然后显示它们将以优先级顺序处理。输出如下:
Polling...
Adding to queue
Polled: 1
Polled: 1
Polled: 2
Polled: 5
Polled: 6
Polled: 6
Polled: 7
5. 总结
在这篇指南中,我们展示了如何使用PriorityBlockingQueue
来让线程阻塞直到添加了一些元素,并且能够根据优先级处理这些元素。这些示例的实现可以在GitHub上找到,这是一个基于Maven的项目,可以直接运行。