java中DelayQueue的使用

简介: java中DelayQueue的使用

目录



java中DelayQueue的使用


简介


今天给大家介绍一下DelayQueue,DelayQueue是BlockingQueue的一种,所以它是线程安全的,DelayQueue的特点就是插入Queue中的数据可以按照自定义的delay时间进行排序。只有delay时间小于0的元素才能够被取出。


DelayQueue


先看一下DelayQueue的定义:


public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>


从定义可以看到,DelayQueue中存入的对象都必须是Delayed的子类。


Delayed继承自Comparable,并且需要实现一个getDelay的方法。


为什么这样设计呢?


因为DelayQueue的底层存储是一个PriorityQueue,在之前的文章中我们讲过了,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable方法。而getDelay方法则用来判断排序后的元素是否可以从Queue中取出。


DelayQueue的应用


DelayQueue一般用于生产者消费者模式,我们下面举一个具体的例子。


首先要使用DelayQueue,必须自定义一个Delayed对象:


@Data
public class DelayedUser implements Delayed {
    private String name;
    private long avaibleTime;
    public DelayedUser(String name, long delayTime){
        this.name=name;
        //avaibleTime = 当前时间+ delayTime
        this.avaibleTime=delayTime + System.currentTimeMillis();
    }
    @Override
    public long getDelay(TimeUnit unit) {
        //判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((DelayedUser) o).getAvaibleTime());
    }
}


上面的对象中,我们需要实现getDelay和compareTo方法。


接下来我们创建一个生产者:


@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
    private DelayQueue<DelayedUser> delayQueue;
    private Integer messageCount;
    private long delayedTime;
    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser delayedUser = new DelayedUser(
                        new Random().nextInt(1000)+"", delayedTime);
                log.info("put delayedUser {}",delayedUser);
                delayQueue.put(delayedUser);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}


在生产者中,我们每隔0.5秒创建一个新的DelayedUser对象,并入Queue。


再创建一个消费者:


@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {
    private DelayQueue<DelayedUser> delayQueue;
    private int messageCount;
    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser element = delayQueue.take();
                log.info("take {}",element );
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}


在消费者中,我们循环从queue中获取对象。


最后看一个调用的例子:


@Test
    public void useDelayedQueue() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        DelayQueue<DelayedUser> queue = new DelayQueue<>();
        int messageCount = 2;
        long delayTime = 500;
        DelayedQueueConsumer consumer = new DelayedQueueConsumer(
                queue, messageCount);
        DelayedQueueProducer producer = new DelayedQueueProducer(
                queue, messageCount, delayTime);
        // when
        executor.submit(producer);
        executor.submit(consumer);
        // then
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();
    }


上面的测试例子中,我们定义了两个线程的线程池,生产者产生两条消息,delayTime设置为0.5秒,也就是说0.5秒之后,插入的对象能够被获取到。


线程池在5秒之后会被关闭。


运行看下结果:


[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487, avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487, avaibleTime=1587623188899)


我们看到消息的put和take是交替进行的,符合我们的预期。


如果我们做下修改,将delayTime修改为50000,那么在线程池关闭之前插入的元素是不会过期的,也就是说消费者是无法获取到结果的。


总结


DelayQueue是一种有奇怪特性的BlockingQueue,可以在需要的时候使用。


本文的例子https://github.com/ddean2009/learn-java-collections

相关文章
|
6月前
|
存储 缓存 Java
Java并发基础:DelayQueue全面解析!
DelayQueue类专为处理延迟任务设计,它允许开发者将任务与指定的延迟时间关联,并在任务到期时自动处理,从而避免了不必要的轮询和资源浪费,此外,DelayQueue内部基于优先队列实现,确保最先到期的任务总是优先被处理,使得任务调度更为高效和精准。
177 1
Java并发基础:DelayQueue全面解析!
|
消息中间件 存储 缓存
|
Java
Java Review - 并发编程_DelayQueue原理&源码剖析
Java Review - 并发编程_DelayQueue原理&源码剖析
76 0
|
缓存 网络协议 Java
【Java原理探索】教你如何使用「精巧好用」的DelayQueue(延时队列)
【Java原理探索】教你如何使用「精巧好用」的DelayQueue(延时队列)
176 0
|
Java Spring
Java多线程中的延时队列DelayQueue
慢慢进入JAVA的内心世界, 今天也一直在和JAVA的语法作斗争, 到周三,写的一个基于SPRING BOOT的日志小模块,
13083 0
|
Java
java源码-DelayQueue
开篇  DelayedQueue是一个用来延时处理的队列,delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素 所谓延时处理就是说可以为队列中元素设定一个过期时间, 相关的操作受到这个设定时间的控制。
1163 0
|
9天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
5天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
25 9
|
8天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####