An unbounded blocking queue that uses the same ordering rules as class
PriorityQueue
and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causingOutOfMemoryError
).
PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurOfMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
import
java.util.ArrayList;
import
java.util.List;
import
java.util.Queue;
import
java.util.Random;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.PriorityBlockingQueue;
import
java.util.concurrent.TimeUnit;
class
PrioritizedTask
implements
Runnable, Comparable<PrioritizedTask> {
private
static
int
counter =
1
;
private
final
int
priority;
private
Random random =
new
Random(
47
);
private
final
int
id = counter++;
//这个id不是static的,因此
protected
static
List<PrioritizedTask> sequence =
new
ArrayList<>();
public
PrioritizedTask(
int
priority) {
this
.priority = priority;
sequence.add(
this
);
}
@Override
public
int
compareTo(PrioritizedTask o) {
int
val =
this
.priority - o.priority;
//higher value, higher priority
return
val <
0
?
1
: (val >
0
? -
1
:
0
);
}
@Override
public
void
run() {
try
{
TimeUnit.MILLISECONDS.sleep(random.nextInt(
250
));
}
catch
(InterruptedException e) {
}
System.out.println(
this
);
}
@Override
public
String toString() {
return
String.format(
"P=[%1$-3d]"
, priority) +
", ID="
+ id;
}
public
static
class
EndFlagTask
extends
PrioritizedTask {
private
ExecutorService exec;
public
EndFlagTask(ExecutorService executorService) {
super
(-
1
);
//最低的优先级
exec = executorService;
}
@Override
public
void
run() {
System.out.println(
this
+
" calling shutdownNow()"
);
exec.shutdownNow();
}
}
}
class
PrioritizedTaskProducer
implements
Runnable {
private
Queue<Runnable> queue;
private
ExecutorService exec;
public
PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
this
.queue = queue;
this
.exec = exec;
}
@Override
public
void
run() {
try
{
//慢慢的添加高优先级的任务
for
(
int
i =
0
; i <
6
; i++) {
TimeUnit.MILLISECONDS.sleep(
250
);
queue.add(
new
PrioritizedTask(
9
));
//6个优先级10
}
//先创建2个P=0的任务
queue.add(
new
PrioritizedTask(
0
));
queue.add(
new
PrioritizedTask(
0
));
//添加低优先级的任务
for
(
int
i =
0
; i <
6
; i++) {
// 优先级0-5
queue.add(
new
PrioritizedTask(i));
}
//添加一个结束标志的任务
queue.add(
new
PrioritizedTask.EndFlagTask(exec));
}
catch
(InterruptedException e) {
// TODO: handle exception
}
System.out.println(
"Finished PrioritizedTaskProducer."
);
}
}
class
PrioritizedTaskConsumer
implements
Runnable {
private
PriorityBlockingQueue<Runnable> queue;
public
PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
this
.queue = queue;
}
@Override
public
void
run() {
try
{
//不停的从queue里面取任务,直到exec停止。
while
(!Thread.interrupted()) {
//使用当前线程来跑这些任务
queue.take().run();
}
}
catch
(InterruptedException e) {
}
System.out.println(
"Finished PrioritizedTaskConsumer."
);
}
}
public
final
class
PriorityBlockingQueueDemo {
public
static
void
main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue =
new
PriorityBlockingQueue<>();
exec.execute(
new
PrioritizedTaskProducer(queue, exec));
exec.execute(
new
PrioritizedTaskConsumer(queue));
}
}
|
执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
P=[
9
], ID=
1
P=[
9
], ID=
2
P=[
9
], ID=
3
P=[
9
], ID=
4
P=[
9
], ID=
5
Finished PrioritizedTaskProducer.
P=[
9
], ID=
6
P=[
5
], ID=
14
P=[
4
], ID=
13
P=[
3
], ID=
12
P=[
2
], ID=
11
P=[
1
], ID=
10
P=[
0
], ID=
7
P=[
0
], ID=
9
P=[
0
], ID=
8
P=[-
1
], ID=
15
calling shutdownNow()
Finished PrioritizedTaskConsumer.
|
PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。
PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。
从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。
1
|
Finished PrioritizedTaskProducer.
|
这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。
任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从
1
|
P=[
5
], ID=
14
|
可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。
还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:
1
2
3
|
P=[
0
], ID=
7
P=[
0
], ID=
9
P=[
0
], ID=
8
|
另外,在使用此类的时候需要注意:
This class does not permit
null
elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results inClassCastException
).