DelayQueue延时队列使用

简介: DelayQueue延时队列使用
----------TaskDelay 延时对象---------------

package com.asiainfo.audit.delay;


import org.apache.log4j.Logger;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* 继承Delayed使其成为一个延迟对象
* 该类是一个消息体类,在延时队列中这个实现了Delayed接口的消息类是必不可少的,
* 实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的,
* 这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体
* @author zhoukai7
* @email zhoukai7@asiainfo.com
* @date 2017-12-02 18:23:21
*/
public class TaskDelay<T extends Runnable> implements Delayed {
private static final Logger LOG = Logger.getLogger(TaskDelay.class);
private long appointTime;//到期时间
private T task;//预约对象
private static final AtomicLong atomic = new AtomicLong(0);

public TaskDelay() {

}

/**
*
* @param appointTime 超时时间
* @param task
*/
public TaskDelay(long appointTime, T task) {
LOG.info("TaskDelay start sys +++++++++++++++++++++++++++++++++++++++");
this.appointTime = appointTime;
this.task = task;
}

public static long atomicNumber() {
return atomic.getAndIncrement();
//return atomic.incrementAndGet();
}

@Override
public int compareTo(Delayed o) {
LOG.info("TaskDelay is compareTo start++++++++++++++++++++++++++++++++");
if (o == null) {
return 1;
}
if (o == this) {
return 0;
}
if (o instanceof TaskDelay) {
TaskDelay taskDelay = (TaskDelay) o;

if (appointTime > taskDelay.appointTime) {//过期时刻越靠后,越排在队尾
LOG.info("TaskDelay is compareTo is === >>>>>>>>");
return 1;
} else if (appointTime < taskDelay.appointTime) {
LOG.info("TaskDelay is compareTo is === ========");
return -1;
} else {
LOG.info("TaskDelay is compareTo is === =<<<<<<<<<<<");
return 0;
}

}

return 0;

}


@Override
public long getDelay(TimeUnit unit) {
LOG.info("getDelay is or not this.appointTime="+this.appointTime+"System.nanoTime()"+System.currentTimeMillis());
long n =unit.convert(this.appointTime - System.nanoTime(), TimeUnit.NANOSECONDS);
LOG.info("-getDelay is or not this.appointTime-"+n);

Date now = new Date();
long diff = this.appointTime - now.getTime();
return unit.convert(diff, TimeUnit.MILLISECONDS);

}

public T getTask() {
return this.task;
}

@Override
public int hashCode() {
return task.hashCode();
}

@Override
public boolean equals(Object object) {
if (object instanceof TaskDelay) {
return object.hashCode() == hashCode() ? true : false;
}
return false;
}

}
----------TaskDelayManage延时管理器---------------
package com.asiainfo.audit.delay;


import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* 延迟任务调度类
* 后台守护线程不断的执行检测工作
* [1、需要在容器初始化的时候调用init方法]
* [2、需要实现一个实现runnable接口的类,调用TaskDelayManage的put方法保存任务到队列]
* 该类是一个延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,
* 以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务
* <p>
* [注意:动态的取消任务没有实现,后面补充]
*
* @author zhoukai7
* @email zhoukai7@asiainfo.com
* @date 2017-12-02 18:23:21
*/
public class TaskDelayManage {
private static final Logger LOG = Logger.getLogger(TaskDelayManage.class);

/**
* 创建空队列 DelayQueue
*/
private static final DelayQueue<TaskDelay> delayQueue = new DelayQueue<>();
private static final TaskDelayManage taskDelayManageThread = new TaskDelayManage();

private Executor executor = Executors.newFixedThreadPool(20);
private static final AtomicLong atomic = new AtomicLong(0);

public static long atomicNumber() {
return atomic.getAndIncrement();
//return atomic.incrementAndGet();
}

public TaskDelayManage() {
//容器初始化就必须执行init
init();
}
// private static class LazyHolder {
// private static final TaskDelayManage taskDelayManageThread = new TaskDelayManage();
// }
// public static final TaskDelayManage getInstance() {
// return LazyHolder.taskDelayManageThread;
// }
public static TaskDelayManage getInstance() {
return taskDelayManageThread;
}
//创建一个固定大小的线程池
/**
* 守护线程
*/
private Thread daemonThread;

/**
* 初始化守护线程
* init()方法,要在容器初始化的时候就要执行,
* 或是在第一次put延迟对象任务之前就要初始化完成,
* 当设定的延迟时间到期时会执行任务对象中的run()
*/
public void init() {
daemonThread = new Thread(() -> {
try {
execute();
} catch (Exception e) {
e.printStackTrace();
LOG.info(e.getMessage());
}
});
daemonThread.setDaemon(true);
daemonThread.setName("DelayQueue Daemon Thread");
daemonThread.start();
}

private void execute() {
LOG.info("TaskDelayManage execute start !!!!!:" + System.currentTimeMillis());
while (true) {

LOG.info("The end of daemonThread(), tt.isAlive()=" + daemonThread.isAlive());
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
LOG.info("当前存活线程数量:" + map.size());
int taskNum = delayQueue.size();
LOG.info("当前延时任务数量:" + taskNum);
try {

LOG.info("execute is fou yi zhi zu se!!!!!" + System.currentTimeMillis());
//从延迟队列中取值,如果无对象过期则队列一直等待
//如果Task中timeout大于当前的时间则可以通过take()取出到期的对象,如果没有则该队列一直等待
TaskDelay taskDelay = delayQueue.take();
LOG.info("taskDelay-------JSON.toJSONString(taskDelay)------taskDelay------" + JSON.toJSONString(taskDelay));
if (taskDelay != null) {
LOG.info("TaskDelay have value is OK OK OK !!!!!:" + System.currentTimeMillis());
//update任务对象的状态
Runnable task = taskDelay.getTask();
if (task == null) {
LOG.info("task is null is null is error error !!!!");
continue;
}
try{
executor.execute(task);
//如果处理正常删除队列
// removeTask(taskDelay);
}catch (Exception e){
e.printStackTrace();
}

LOG.info("this task:" + task + " && Time:" + System.currentTimeMillis());
}
} catch (Exception e) {
LOG.error("TaskDelayManage execute is ERROR ERROR !!!!!", e);
e.printStackTrace();
//break;
}
}

}


/**
* 向队列中添加任务,time task
*
* @param time 为延迟时间
* @param task 添加的任务
*/
public void put(long time, Runnable task) {
LOG.info("TaskDelayManage put task start :" + task + " && Time:" + System.currentTimeMillis());
//long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.NANOSECONDS);
long nanoTime = time;
//给任务设置延迟时间
TaskDelay taskDelay = new TaskDelay(nanoTime, task);
//将任务放在延迟队列中
delayQueue.put(taskDelay);
Object[] ssN = get();
LOG.info(ssN.length + "-----TaskDelayManage put task end :" + task + " && Time:" + System.currentTimeMillis());
}

/**
* 从队列中获取任务
* zhoukai7
*/
public Object[] get() {
Object[] objArry = delayQueue.toArray();
return objArry;
}

/**
* 从队列中移除任务
*
* @param taskDelay
*/
public boolean removeTask(TaskDelay taskDelay) {
return delayQueue.remove(taskDelay);
}
}


----------TaskBusiness 消费者---------------

package com.asiainfo.audit.delay;


import com.asiainfo.audit.entity.AppointAudit;
import com.asiainfo.audit.mediator.SpringContextUtil;
import com.asiainfo.audit.service.DelayQueueServiceImpl;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;

/**
*消息的消费者
* zhoukai7
*/
public class TaskBusiness implements Runnable {
private static final Logger LOG = Logger.getLogger(TaskBusiness.class);
private AppointAudit appointAudit;

public TaskBusiness(){}

public TaskBusiness(AppointAudit appointAudit) {
this.appointAudit = appointAudit;
}

@Override
public void run() {
//该地方处理相关逻辑
LOG.info(Thread.currentThread().getName() + "--RelevantBusinessDeal---");
LOG.info("is is is is tow ci start -------saveDelayQueue!!!!!!!!");
ApplicationContext applicationContext = SpringContextUtil.getApplicationContext();
DelayQueueServiceImpl iDelayQueueService = (DelayQueueServiceImpl) applicationContext.getBean(DelayQueueServiceImpl.class);
//处理预约表和审核表和流水表
iDelayQueueService.updateDelayQueue(appointAudit);
LOG.info("is is is is tow ci end----------saveDelayQueue!!!!!!!!");
}

public AppointAudit getAppointAudit() {
return appointAudit;
}

}

----------ManageApplication 容器启动调用---------------
package com.asiainfo;

import com.asiainfo.audit.service.DelayQueueServiceImpl;
import com.asiainfo.audit.delay.ServerFilter;
import com.asiainfo.audit.delay.TaskDelayManage;

import org.apache.log4j.Logger;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class ManageApplication extends SpringBootServletInitializer implements ApplicationListener<ContextRefreshedEvent> {
private static final Logger LOG = Logger.getLogger(ManageApplication.class);

public static void main(String[] args) {
SpringApplication.run(ManageApplication.class, args);
}

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(ManageApplication.class);
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
LOG.info("------------ManageApplication start !!!!!!!!!");
//必须继承ApplicationListener在springboot执行后在去执行一些代码
//必须在这里执行TaskDelayManage
TaskDelayManage taskDelayManages = new TaskDelayManage();
//在容器加载完毕后获取dao层来操作数据库
DelayQueueServiceImpl iDelayQueueService = (DelayQueueServiceImpl) event.getApplicationContext().getBean(DelayQueueServiceImpl.class);
ServerFilter serverFilter = new ServerFilter(iDelayQueueService);
//在容器加载完毕后启动线程
Thread thread = new Thread(serverFilter);
thread.start();
注意:标红色的部分不能这么写,否则会导致线程死锁而且会导致,无法往队列存消息。这是因为主线程,和子线程的参数交互问题,
LOG.info("------------ManageApplication end !!!!!!!!!");

}
}


、**************************************************************************************************************************
--以下是可用
package com.asiainfo;

import com.asiainfo.audit.service.DelayQueueServiceImpl;
import com.asiainfo.audit.delay.ServerFilter;
import com.asiainfo.audit.delay.TaskDelayManage;

import org.apache.log4j.Logger;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class ManageApplication extends SpringBootServletInitializer implements ApplicationListener<ContextRefreshedEvent> {
private static final Logger LOG = Logger.getLogger(ManageApplication.class);

public static void main(String[] args) {
SpringApplication.run(ManageApplication.class, args);
}

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(ManageApplication.class);
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
LOG.info("------------ManageApplication start !!!!!!!!!");
//必须继承ApplicationListener在springboot执行后在去执行一些代码
//在容器加载完毕后获取dao层来操作数据库
DelayQueueServiceImpl iDelayQueueService = (DelayQueueServiceImpl) event.getApplicationContext().getBean(DelayQueueServiceImpl.class);
iDelayQueueService.initDelayQueue();
LOG.info("------------ManageApplication end !!!!!!!!!");

}
}

----------IDelayQueueService业务层调用---------------

package com.asiainfo.audit.api;

import com.asiainfo.audit.entity.AppointAudit;

import java.util.List;

/**
* 对各个业务层提供的预约上下架的接口
* zhoukai7
*/
public interface IDelayQueueService {
/**
* 容器初始化的时候,为了防止数据丢失,
* 先从数据库检索然后保存到队列
*
* @param list
*/
void putInitDelayQueue(List<AppointAudit> list);


/**
* zhoukai7
* 提供给各个业务模块调用接口
*
* @param appointAudit
*/
int saveDelayQueue(AppointAudit appointAudit);

/**
* zhoukai7
* 当容器初始化的时候执行该方法,
* 重新读取预约信息从数据库保存到队列
*/
void initDelayQueue();

/**
* zhoukai7
* 预约时间到点执行删除预约表
* 保存审核流水信息
* @return
*/
void updateDelayQueue(AppointAudit appointAudit);


}
----------DelayQueueServiceImpl 业务实现类---------------

package com.asiainfo.audit.service;

import com.alibaba.fastjson.JSON;
import com.asiainfo.audit.api.*;
imp
相关文章
|
缓存
指令缓存队列
指令缓存队列
73 0
|
8月前
|
消息中间件 数据库
七、延时队列
七、延时队列
85 0
|
8月前
队列的实现
队列的实现
|
C++
c++ 队列
队列的数据结构
45 0
|
消息中间件
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
62 0
|
消息中间件
死信队列和延迟队列的介绍
死信队列和延迟队列的介绍
|
机器学习/深度学习 存储 C语言
队列的实现(上)
队列的实现(上)
|
消息中间件 Java Kafka
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
252 0
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
|
存储 消息中间件 安全
堵塞队列BlockingQueue 使用与理解
堵塞队列本质就是队列,底层数据结构 通常是由数组,或者链表构成。实现FIFO思想 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
176 0
|
消息中间件 Java Kafka
RabbitMQ没有延时队列?我就教你一招,玩转延时队列
延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后被消费。普通队列是即时消费的,延时队列是根据延时时间,多久之后才能消费的。