Java并发新构件之DelayQueue

简介:

    DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走。这种队列是有序的,即队头的延迟到期时间最短。如果没有任何延迟到期,那么久不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中)

    下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最“紧急”的任务从队列中取出来,然后运行它:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import  java.util.ArrayList;
import  java.util.List;
import  java.util.Random;
import  java.util.concurrent.DelayQueue;
import  java.util.concurrent.Delayed;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.TimeUnit;
import  static  java.util.concurrent.TimeUnit.*;
 
class  DelayedTask  implements  Runnable, Delayed {
 
     private  static  int  counter =  0 ;
     protected  static  List<DelayedTask> sequence =  new  ArrayList<>();
     private  final  int  id = counter++;
     private  final  int  delayTime;
     private  final  long  triggerTime;
     public  DelayedTask( int  delayInMillis) {
         delayTime = delayInMillis;
         triggerTime = System.nanoTime() + NANOSECONDS.convert(delayTime, MILLISECONDS);
         sequence.add( this );
     }
     
     @Override
     public  int  compareTo(Delayed o) {
         DelayedTask that = (DelayedTask)o;
         if  (triggerTime < that.triggerTime)  return  - 1 ;
         if  (triggerTime > that.triggerTime)  return  1 ;
         return  0 ;
     }
 
     /**
      * 剩余的延迟时间
      */
     @Override
     public  long  getDelay(TimeUnit unit) {
         return  unit.convert(triggerTime - System.nanoTime(), NANOSECONDS);
     }
 
     @Override
     public  void  run() {
         System.out.println( this  " " );
     }
     
     @Override
     public  String toString() {
         return  String.format( "[%1$-4d]" , delayTime) +  " Task "  + id;
     }
     
     public  static  class  EndSentinel  extends  DelayedTask {
         private  ExecutorService exec;
         public  EndSentinel( int  delay, ExecutorService exec) {
             super (delay);
             this .exec = exec;
         }
         @Override
         public  void  run() {
             System.out.println( this  " calling shutDownNow()" );
             exec.shutdownNow();
         }
     }
}
 
class  DelayedTaskConsumer  implements  Runnable {
     private  DelayQueue<DelayedTask> tasks;
     public  DelayedTaskConsumer(DelayQueue<DelayedTask> tasks) {
         this .tasks = tasks;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 tasks.take().run(); //run tasks with current thread.
             }
         catch  (InterruptedException e) {
             // TODO: handle exception
         }
         System.out.println( "Finished DelaytedTaskConsumer." );
     }
}
 
 
public  class  DelayQueueDemo {
     public  static  void  main(String[] args) {
         int  maxDelayTime =  5000 ; //milliseconds
         Random random =  new  Random( 47 );
         ExecutorService exec = Executors.newCachedThreadPool();
         DelayQueue<DelayedTask> queue =  new  DelayQueue<>();
         //填充10个休眠时间随机的任务
         for  ( int  i =  0 ; i <  10 ; i++) {
             queue.put( new  DelayedTask(random.nextInt(maxDelayTime)));
         }
         //设置结束的时候。
         queue.add( new  DelayedTask.EndSentinel(maxDelayTime, exec));
         exec.execute( new  DelayedTaskConsumer(queue));
     }
}

执行结果:

?
1
2
3
4
5
6
7
8
9
10
11
12
[ 200  ] Task  7 
[ 429  ] Task  5 
[ 555  ] Task  1 
[ 961  ] Task  4 
[ 1207 ] Task  9 
[ 1693 ] Task  2 
[ 1861 ] Task  3 
[ 4258 ] Task  0 
[ 4522 ] Task  8 
[ 4868 ] Task  6 
[ 5000 ] Task  10  calling shutDownNow()
Finished DelaytedTaskConsumer.

    DelayedTask包含一个称为sequence的List<DelayedTask>,它保存了在任务被创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的(即到期时间短的先出队列)。

    Delayed接口有一个方法名为getDelay(),它可以用来告知延迟到期还有多长时间,或者延迟在多长时间之前已经到期。这个方法将强制我们去使用TimeUnit类,因为这就是参数类型。这会产生一个非常方便的类,因为你可以很容易地转换单位而无需做任何声明。例如,delayTime的值是以毫秒为单位的,但是System.nanoTime()产生的时间则是以纳秒为单位的。你可以转换delayTime的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样:

?
1
NANOSECONDS.convert(delayTime, MILLISECONDS);

    为了排序,Delayed接口还继承了Comparable接口,因此必须实现compareTo()方法,使其可以产生合理的比较。toString()则提供了输出格式化,而嵌套的EndSentinel类提供了一种关闭所有事物的途径,具体做法是将其放置为队列的最后一个元素。

    注意,因为DelayedTaskConsumer自身是一个任务,所以它有自己的Thread,它可以使用这个线程来运行从队列中获取的所有任务。由于任务是按照队列优先级的顺序来执行的,因此在本例中不需要启动任何单独的线程来运行DelayedTask。

目录
相关文章
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
56 2
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
36 1
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
4月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
66 5
|
4月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
63 2
|
存储 安全 Java
java中DelayQueue的使用
java中DelayQueue的使用
|
1天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
3天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。