核心:java.util.concurrent.Delayed和org.springframework.boot.CommandLineRunner;
场景:用户注册后,5分钟推送消息等
一、依赖包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--非必须--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.0.M1</version> </dependency> <!--非必须--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> <!--非必须--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
二、工具类
执行任务所需的基础参数
import lombok.Data; @Data public class TaskBase { //任务参数,根据业务需求多少都行 private String identifier; public TaskBase(String identifier) { this.identifier = identifier; } }
执行的任务和时间
import cn.hutool.core.date.DateUtil; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 延时任务 */ public class DelayTask implements Delayed { //任务参数 final private TaskBase data; //任务的延时时间,单位毫秒 final private long expire; /** * 构造延时任务 * * @param data 业务数据 * @param expire 任务延时时间(ms) */ public DelayTask(TaskBase data, long expire) { super(); this.data = data; this.expire = expire + System.currentTimeMillis(); } public TaskBase getData() { return data; } public long getExpire() { return expire; } @Override public boolean equals(Object obj) { if (obj instanceof DelayTask) { return this.data.getIdentifier().equals(((DelayTask) obj).getData().getIdentifier()); } return false; } @Override public String toString() { return "{" + "data:" + data.toString() + "," + "延时时间:" +expire+ DateUtil.format(new Date(),"yyyy.MM.dd HH:mm:ss") + "}"; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), unit); } @Override public int compareTo(Delayed o) { long delta = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (int) delta; } }
import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.concurrent.DelayQueue; import java.util.concurrent.Executors; @Component @Slf4j public class DelayQueueManager implements CommandLineRunner { private final DelayQueue<DelayTask> delayQueue = new DelayQueue<>(); /** * 加入到延时队列中 * * @param task */ public void put(DelayTask task) { log.error("加入延时任务:{}", task); delayQueue.put(task); } /** * 取消延时任务 * * @param task * @return */ public boolean remove(DelayTask task) { log.error("取消延时任务:{}", task); return delayQueue.remove(task); } /** * 取消延时任务 * * @param taskid * @return */ public boolean remove(String taskid) { return remove(new DelayTask(new TaskBase(taskid), 0)); } @Override public void run(String... args) throws Exception { log.info("初始化延时队列"); Executors.newSingleThreadExecutor().execute(new Thread(this::excuteThread)); } /** * 延时任务执行线程 */ private void excuteThread() { while (true) { try { DelayTask task = delayQueue.take(); //执行任务 processTask(task); } catch (InterruptedException e) { break; } } } /** * 内部执行延时任务 * * @param task */ private void processTask(DelayTask task) { //获取任务参数,执行业务task.getData().getIdentifier() log.error("执行延时任务:{}-{}", task, task.getData().getIdentifier()); } }
三、执行测试任务
import com.example.demo.task.DelayQueueManager; import com.example.demo.task.DelayTask; import com.example.demo.task.TaskBase; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DemoApplicationTests { @Autowired private DelayQueueManager delayQueueManager; @Test void contextLoads() throws InterruptedException { //新增任务 delayQueueManager.put(new DelayTask(new TaskBase("abc"), 1000 * 1)); //新增任务 delayQueueManager.put(new DelayTask(new TaskBase("abc"), 1000 * 5)); //新增任务 delayQueueManager.put(new DelayTask(new TaskBase("abc"), 1000 * 6)); //测试任务需要下边代码执行,线上不用 Thread.sleep(10 * 1000); } }