java中DelayQueue的使用

简介: java中DelayQueue的使用

目录



java中DelayQueue的使用


简介


今天给大家介绍一下DelayQueue,DelayQueue是BlockingQueue的一种,所以它是线程安全的,DelayQueue的特点就是插入Queue中的数据可以按照自定义的delay时间进行排序。只有delay时间小于0的元素才能够被取出。


DelayQueue


先看一下DelayQueue的定义:


public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>


从定义可以看到,DelayQueue中存入的对象都必须是Delayed的子类。


Delayed继承自Comparable,并且需要实现一个getDelay的方法。


为什么这样设计呢?


因为DelayQueue的底层存储是一个PriorityQueue,在之前的文章中我们讲过了,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable方法。而getDelay方法则用来判断排序后的元素是否可以从Queue中取出。


DelayQueue的应用


DelayQueue一般用于生产者消费者模式,我们下面举一个具体的例子。


首先要使用DelayQueue,必须自定义一个Delayed对象:


@Data
public class DelayedUser implements Delayed {
    private String name;
    private long avaibleTime;
    public DelayedUser(String name, long delayTime){
        this.name=name;
        //avaibleTime = 当前时间+ delayTime
        this.avaibleTime=delayTime + System.currentTimeMillis();
    }
    @Override
    public long getDelay(TimeUnit unit) {
        //判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((DelayedUser) o).getAvaibleTime());
    }
}


上面的对象中,我们需要实现getDelay和compareTo方法。


接下来我们创建一个生产者:


@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
    private DelayQueue<DelayedUser> delayQueue;
    private Integer messageCount;
    private long delayedTime;
    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser delayedUser = new DelayedUser(
                        new Random().nextInt(1000)+"", delayedTime);
                log.info("put delayedUser {}",delayedUser);
                delayQueue.put(delayedUser);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}


在生产者中,我们每隔0.5秒创建一个新的DelayedUser对象,并入Queue。


再创建一个消费者:


@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {
    private DelayQueue<DelayedUser> delayQueue;
    private int messageCount;
    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser element = delayQueue.take();
                log.info("take {}",element );
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}


在消费者中,我们循环从queue中获取对象。


最后看一个调用的例子:


@Test
    public void useDelayedQueue() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        DelayQueue<DelayedUser> queue = new DelayQueue<>();
        int messageCount = 2;
        long delayTime = 500;
        DelayedQueueConsumer consumer = new DelayedQueueConsumer(
                queue, messageCount);
        DelayedQueueProducer producer = new DelayedQueueProducer(
                queue, messageCount, delayTime);
        // when
        executor.submit(producer);
        executor.submit(consumer);
        // then
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();
    }


上面的测试例子中,我们定义了两个线程的线程池,生产者产生两条消息,delayTime设置为0.5秒,也就是说0.5秒之后,插入的对象能够被获取到。


线程池在5秒之后会被关闭。


运行看下结果:


[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487, avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487, avaibleTime=1587623188899)


我们看到消息的put和take是交替进行的,符合我们的预期。


如果我们做下修改,将delayTime修改为50000,那么在线程池关闭之前插入的元素是不会过期的,也就是说消费者是无法获取到结果的。


总结


DelayQueue是一种有奇怪特性的BlockingQueue,可以在需要的时候使用。


本文的例子https://github.com/ddean2009/learn-java-collections

相关文章
|
7月前
|
存储 缓存 Java
Java并发基础:DelayQueue全面解析!
DelayQueue类专为处理延迟任务设计,它允许开发者将任务与指定的延迟时间关联,并在任务到期时自动处理,从而避免了不必要的轮询和资源浪费,此外,DelayQueue内部基于优先队列实现,确保最先到期的任务总是优先被处理,使得任务调度更为高效和精准。
193 1
Java并发基础:DelayQueue全面解析!
|
消息中间件 存储 缓存
|
Java
Java Review - 并发编程_DelayQueue原理&源码剖析
Java Review - 并发编程_DelayQueue原理&源码剖析
83 0
|
缓存 网络协议 Java
【Java原理探索】教你如何使用「精巧好用」的DelayQueue(延时队列)
【Java原理探索】教你如何使用「精巧好用」的DelayQueue(延时队列)
185 0
|
Java Spring
Java多线程中的延时队列DelayQueue
慢慢进入JAVA的内心世界, 今天也一直在和JAVA的语法作斗争, 到周三,写的一个基于SPRING BOOT的日志小模块,
13087 0
|
Java
java源码-DelayQueue
开篇  DelayedQueue是一个用来延时处理的队列,delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素 所谓延时处理就是说可以为队列中元素设定一个过期时间, 相关的操作受到这个设定时间的控制。
1166 0
|
1天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
3天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
3天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。