Java并发新构件之DelayQueue

简介:

    DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走。这种队列是有序的,即队头的延迟到期时间最短。如果没有任何延迟到期,那么久不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中)

    下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最“紧急”的任务从队列中取出来,然后运行它:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import  java.util.ArrayList;
import  java.util.List;
import  java.util.Random;
import  java.util.concurrent.DelayQueue;
import  java.util.concurrent.Delayed;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.TimeUnit;
import  static  java.util.concurrent.TimeUnit.*;
 
class  DelayedTask  implements  Runnable, Delayed {
 
     private  static  int  counter =  0 ;
     protected  static  List<DelayedTask> sequence =  new  ArrayList<>();
     private  final  int  id = counter++;
     private  final  int  delayTime;
     private  final  long  triggerTime;
     public  DelayedTask( int  delayInMillis) {
         delayTime = delayInMillis;
         triggerTime = System.nanoTime() + NANOSECONDS.convert(delayTime, MILLISECONDS);
         sequence.add( this );
     }
     
     @Override
     public  int  compareTo(Delayed o) {
         DelayedTask that = (DelayedTask)o;
         if  (triggerTime < that.triggerTime)  return  - 1 ;
         if  (triggerTime > that.triggerTime)  return  1 ;
         return  0 ;
     }
 
     /**
      * 剩余的延迟时间
      */
     @Override
     public  long  getDelay(TimeUnit unit) {
         return  unit.convert(triggerTime - System.nanoTime(), NANOSECONDS);
     }
 
     @Override
     public  void  run() {
         System.out.println( this  " " );
     }
     
     @Override
     public  String toString() {
         return  String.format( "[%1$-4d]" , delayTime) +  " Task "  + id;
     }
     
     public  static  class  EndSentinel  extends  DelayedTask {
         private  ExecutorService exec;
         public  EndSentinel( int  delay, ExecutorService exec) {
             super (delay);
             this .exec = exec;
         }
         @Override
         public  void  run() {
             System.out.println( this  " calling shutDownNow()" );
             exec.shutdownNow();
         }
     }
}
 
class  DelayedTaskConsumer  implements  Runnable {
     private  DelayQueue<DelayedTask> tasks;
     public  DelayedTaskConsumer(DelayQueue<DelayedTask> tasks) {
         this .tasks = tasks;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 tasks.take().run(); //run tasks with current thread.
             }
         catch  (InterruptedException e) {
             // TODO: handle exception
         }
         System.out.println( "Finished DelaytedTaskConsumer." );
     }
}
 
 
public  class  DelayQueueDemo {
     public  static  void  main(String[] args) {
         int  maxDelayTime =  5000 ; //milliseconds
         Random random =  new  Random( 47 );
         ExecutorService exec = Executors.newCachedThreadPool();
         DelayQueue<DelayedTask> queue =  new  DelayQueue<>();
         //填充10个休眠时间随机的任务
         for  ( int  i =  0 ; i <  10 ; i++) {
             queue.put( new  DelayedTask(random.nextInt(maxDelayTime)));
         }
         //设置结束的时候。
         queue.add( new  DelayedTask.EndSentinel(maxDelayTime, exec));
         exec.execute( new  DelayedTaskConsumer(queue));
     }
}

执行结果:

?
1
2
3
4
5
6
7
8
9
10
11
12
[ 200  ] Task  7 
[ 429  ] Task  5 
[ 555  ] Task  1 
[ 961  ] Task  4 
[ 1207 ] Task  9 
[ 1693 ] Task  2 
[ 1861 ] Task  3 
[ 4258 ] Task  0 
[ 4522 ] Task  8 
[ 4868 ] Task  6 
[ 5000 ] Task  10  calling shutDownNow()
Finished DelaytedTaskConsumer.

    DelayedTask包含一个称为sequence的List<DelayedTask>,它保存了在任务被创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的(即到期时间短的先出队列)。

    Delayed接口有一个方法名为getDelay(),它可以用来告知延迟到期还有多长时间,或者延迟在多长时间之前已经到期。这个方法将强制我们去使用TimeUnit类,因为这就是参数类型。这会产生一个非常方便的类,因为你可以很容易地转换单位而无需做任何声明。例如,delayTime的值是以毫秒为单位的,但是System.nanoTime()产生的时间则是以纳秒为单位的。你可以转换delayTime的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样:

?
1
NANOSECONDS.convert(delayTime, MILLISECONDS);

    为了排序,Delayed接口还继承了Comparable接口,因此必须实现compareTo()方法,使其可以产生合理的比较。toString()则提供了输出格式化,而嵌套的EndSentinel类提供了一种关闭所有事物的途径,具体做法是将其放置为队列的最后一个元素。

    注意,因为DelayedTaskConsumer自身是一个任务,所以它有自己的Thread,它可以使用这个线程来运行从队列中获取的所有任务。由于任务是按照队列优先级的顺序来执行的,因此在本例中不需要启动任何单独的线程来运行DelayedTask。

目录
相关文章
|
2天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
3 0
|
3天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
21 0
|
11天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
19天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
23天前
|
缓存 NoSQL Java
Java项目:支持并发的秒杀项目(基于Redis)
Java项目:支持并发的秒杀项目(基于Redis)
26 0
|
24天前
|
算法 安全 Java
Java中的并发编程:理解并发性能优化
在当今软件开发领域,多核处理器的普及使得并发编程变得更加重要。本文将深入探讨Java中的并发编程,介绍并发性能优化的关键技术,帮助开发人员更好地利用多核处理器提升应用程序性能。
|
1月前
|
安全 Java API
Java并发 - J.U.C并发容器类 list、set、queue
Queue API 阻塞是通过 condition 来实现的,可参考 Java 并发 - Lock 接口 ArrayBlockingQueue 阻塞 LinkedBlockingQueue 阻塞 ArrayQueue 非阻塞 LinkedQueue 非阻塞
|
存储 安全 Java
java中DelayQueue的使用
java中DelayQueue的使用
|
8天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
1天前
|
安全 Java
java多线程(一)(火车售票)
java多线程(一)(火车售票)