1、创建延时队列的内容对象
每条需要处理的内容都要单独创建一个,然后再放到队列中
public class DelayedTask<T> implements Delayed {
//延时时间,毫秒
private long delay;
private long expire;
//需要处理的数据
private T content;
public DelayedTask(long delay, T content) {
this.delay = delay;
this.content = content;
this.expire = System.currentTimeMillis() + delay;
}
/**
* 用于延迟队列内部进行排序,将最先到期的放在队首,保证take出来的是到期的那个
*/
@Override
public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
/**
* 指定到期时间计算规则
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "DelayedTask [delay=" + delay + ", expire=" + expire + ", Content=" + content + "]";
}
}
2、测试
2.1、无限循环调用
public static void main(String[] args) throws InterruptedException {
//1、创建队列
DelayQueue<DelayedTask> queue = new DelayQueue<>();
//2、插入数据
for (int i = 1; i <= 3; i++) {
int ti = 2000;//i==1 ? 20000 :(i==2?4000:8000);
queue.put(new DelayedTask(ti, "first"));
}
System.out.println("start take task from queue" + DateUtil.now());
//3、循环从队列中取数据
while (!queue.isEmpty()) {
//只有到期的数据才能取的出来,否则就阻塞等待
DelayedTask ddddd = queue.take();
System.out.println(ddddd);
}
}
2.2、线程池调用
import java.text.DateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.cnki.common.readfile.apitype.ProjectAnalysis;
import net.cnki.common.readfile.apitype.ProjectChildAnalysis;
import net.cnki.common.readfile.apitype.ResultAnalysis;
import net.cnki.common.readfile.apitype.SingleFileHandWorkAnalysis;
import net.cnki.common.readfile.apitype.SingleFileUploadsAnalysis;
import net.cnki.common.readfile.apitype.SingleFileZipAnalysis;
/**
* 基于内存的队列的介绍,基于内存的队列,队列的大小依赖于JVM内存的大小,一般如果是内存占用不大且处理
* 相对较为及时的都可以采用此种方法。如果你在队列处理的时候需要有失败重试机制,那么用此种队列就不是特别合适了。
* @author ZhiPengyu
*
*/
public class QueuePool {
Logger logger = LoggerFactory.getLogger(QueuePool.class);
private DelayQueue<DelayedTask> queue = new DelayQueue<>();
private ScheduledExecutorService es = Executors.newScheduledThreadPool(5);
/**
* 构造执行
*/
public QueuePool() {}
/**
* 添加信息至队列中
* @param content
*/
public void addQueue(String[] content) {
queue.offer(content);
execute();
}
/**
* 初始化执行
*/
public void execute() {
es.scheduleWithFixedDelay(new Runnable(){
public void run() {
try {
DelayedTask ddddd = queue.take();
} catch (Exception e) {
logger.error(e.toString());
}
}
}, 100, 500, TimeUnit.MILLISECONDS); //表示延迟10毫秒后每500毫秒执行一次 。执行一次内,处理队列内容不限,执行一次后隔第二个时间参数后继续执行,一直到队列无内容
}//DAYS 天,HOURS 小时,MINUTES 分钟,SECONDS 秒,MILLISECONDS 毫秒
}
public static void main(String[] args) {
QueuePool pool= new QueuePool();
pool.addQueue(...);
}