DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走。这种队列是有序的,即队头的延迟到期时间最短。如果没有任何延迟到期,那么久不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中)
下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最“紧急”的任务从队列中取出来,然后运行它:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
import
java.util.ArrayList;
import
java.util.List;
import
java.util.Random;
import
java.util.concurrent.DelayQueue;
import
java.util.concurrent.Delayed;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.TimeUnit;
import
static
java.util.concurrent.TimeUnit.*;
class
DelayedTask
implements
Runnable, Delayed {
private
static
int
counter =
0
;
protected
static
List<DelayedTask> sequence =
new
ArrayList<>();
private
final
int
id = counter++;
private
final
int
delayTime;
private
final
long
triggerTime;
public
DelayedTask(
int
delayInMillis) {
delayTime = delayInMillis;
triggerTime = System.nanoTime() + NANOSECONDS.convert(delayTime, MILLISECONDS);
sequence.add(
this
);
}
@Override
public
int
compareTo(Delayed o) {
DelayedTask that = (DelayedTask)o;
if
(triggerTime < that.triggerTime)
return
-
1
;
if
(triggerTime > that.triggerTime)
return
1
;
return
0
;
}
/**
* 剩余的延迟时间
*/
@Override
public
long
getDelay(TimeUnit unit) {
return
unit.convert(triggerTime - System.nanoTime(), NANOSECONDS);
}
@Override
public
void
run() {
System.out.println(
this
+
" "
);
}
@Override
public
String toString() {
return
String.format(
"[%1$-4d]"
, delayTime) +
" Task "
+ id;
}
public
static
class
EndSentinel
extends
DelayedTask {
private
ExecutorService exec;
public
EndSentinel(
int
delay, ExecutorService exec) {
super
(delay);
this
.exec = exec;
}
@Override
public
void
run() {
System.out.println(
this
+
" calling shutDownNow()"
);
exec.shutdownNow();
}
}
}
class
DelayedTaskConsumer
implements
Runnable {
private
DelayQueue<DelayedTask> tasks;
public
DelayedTaskConsumer(DelayQueue<DelayedTask> tasks) {
this
.tasks = tasks;
}
@Override
public
void
run() {
try
{
while
(!Thread.interrupted()) {
tasks.take().run();
//run tasks with current thread.
}
}
catch
(InterruptedException e) {
// TODO: handle exception
}
System.out.println(
"Finished DelaytedTaskConsumer."
);
}
}
public
class
DelayQueueDemo {
public
static
void
main(String[] args) {
int
maxDelayTime =
5000
;
//milliseconds
Random random =
new
Random(
47
);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue =
new
DelayQueue<>();
//填充10个休眠时间随机的任务
for
(
int
i =
0
; i <
10
; i++) {
queue.put(
new
DelayedTask(random.nextInt(maxDelayTime)));
}
//设置结束的时候。
queue.add(
new
DelayedTask.EndSentinel(maxDelayTime, exec));
exec.execute(
new
DelayedTaskConsumer(queue));
}
}
|
执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
|
[
200
] Task
7
[
429
] Task
5
[
555
] Task
1
[
961
] Task
4
[
1207
] Task
9
[
1693
] Task
2
[
1861
] Task
3
[
4258
] Task
0
[
4522
] Task
8
[
4868
] Task
6
[
5000
] Task
10
calling shutDownNow()
Finished DelaytedTaskConsumer.
|
DelayedTask包含一个称为sequence的List<DelayedTask>,它保存了在任务被创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的(即到期时间短的先出队列)。
Delayed接口有一个方法名为getDelay(),它可以用来告知延迟到期还有多长时间,或者延迟在多长时间之前已经到期。这个方法将强制我们去使用TimeUnit类,因为这就是参数类型。这会产生一个非常方便的类,因为你可以很容易地转换单位而无需做任何声明。例如,delayTime的值是以毫秒为单位的,但是System.nanoTime()产生的时间则是以纳秒为单位的。你可以转换delayTime的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样:
1
|
NANOSECONDS.convert(delayTime, MILLISECONDS);
|
为了排序,Delayed接口还继承了Comparable接口,因此必须实现compareTo()方法,使其可以产生合理的比较。toString()则提供了输出格式化,而嵌套的EndSentinel类提供了一种关闭所有事物的途径,具体做法是将其放置为队列的最后一个元素。
注意,因为DelayedTaskConsumer自身是一个任务,所以它有自己的Thread,它可以使用这个线程来运行从队列中获取的所有任务。由于任务是按照队列优先级的顺序来执行的,因此在本例中不需要启动任何单独的线程来运行DelayedTask。