使用Redis实现延时任务(一)(上)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。

前提



最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。


候选方案对比



下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。


方案 优势 劣势 选用场景
JDK内置的延迟队列DelayQueue 实现简单 数据内存态,不可靠 一致性相对低的场景
调度框架和MySQL进行短间隔轮询 实现简单,可靠性高 存在明显的性能瓶颈 数据量较少实时性相对低的场景
RabbitMQDLXTTL,一般称为死信队列方案 异步交互可以削峰 延时的时间长度不可控,如果数据需要持久化则性能会降低 -
调度框架和Redis进行短间隔轮询 数据持久化,高性能 实现难度大 常见于支付结果回调方案
时间轮 实时性高 实现难度大,内存消耗大 实时性高的场景


如果应用的数据量不高,实时性要求比较低,选用调度框架和MySQL进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对MySQL实例造成比较大的压力。记得很早之前,看过一个PPT叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:


微信截图_20220512181806.png


里面刚好用到了调度框架和Redis进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。


由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。


场景设计



实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做OrderMessage),订单消息需要延迟5到15秒后进行异步处理。


微信截图_20220512181820.png


否决的候选方案实现思路



下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。


JDK内置延迟队列


DelayQueue是一个阻塞队列的实现,它的队列元素必须是Delayed的子类,这里做个简单的例子:


public class DelayQueueMain {
    private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
    public static void main(String[] args) throws Exception {
        DelayQueue<OrderMessage> queue = new DelayQueue<>();
        // 默认延迟5秒
        OrderMessage message = new OrderMessage("ORDER_ID_10086");
        queue.add(message);
        // 延迟6秒
        message = new OrderMessage("ORDER_ID_10087", 6);
        queue.add(message);
        // 延迟10秒
        message = new OrderMessage("ORDER_ID_10088", 10);
        queue.add(message);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setName("DelayWorker");
            thread.setDaemon(true);
            return thread;
        });
        LOGGER.info("开始执行调度线程...");
        executorService.execute(() -> {
            while (true) {
                try {
                    OrderMessage task = queue.take();
                    LOGGER.info("延迟处理订单消息,{}", task.getDescription());
                } catch (Exception e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }
    private static class OrderMessage implements Delayed {
        private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        /**
         * 默认延迟5000毫秒
         */
        private static final long DELAY_MS = 1000L * 5;
        /**
         * 订单ID
         */
        private final String orderId;
        /**
         * 创建时间戳
         */
        private final long timestamp;
        /**
         * 过期时间
         */
        private final long expire;
        /**
         * 描述
         */
        private final String description;
        public OrderMessage(String orderId, long expireSeconds) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + expireSeconds * 1000L;
            this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }
        public OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + DELAY_MS;
            this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }
        public String getOrderId() {
            return orderId;
        }
        public long getTimestamp() {
            return timestamp;
        }
        public long getExpire() {
            return expire;
        }
        public String getDescription() {
            return description;
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}
复制代码


注意一下,OrderMessage实现Delayed接口,关键是需要实现Delayed#getDelay()Delayed#compareTo()。运行一下main()方法:


10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18
复制代码


调度框架 + MySQL


使用调度框架对MySQL表进行短间隔轮询是实现难度比较低的方案,通常服务刚上线,表数据不多并且实时性不高的情况下应该首选这个方案。不过要注意以下几点:

  • 注意轮询间隔不能太短,否则会对MySQL实例产生影响。
  • 注意每次查询的数量,结果集数量太多有可能会导致调度阻塞和占用应用大量内存,从而影响时效性。
  • 注意要设计状态值和最大重试次数,这样才能尽量避免大量数据积压和重复查询的问题。
  • 最好通过时间列做索引,查询指定时间范围内的数据。


引入QuartzMySQL的Java驱动包和spring-boot-starter-jdbc(这里只是为了方便用相对轻量级的框架实现,生产中可以按场景按需选择其他更合理的框架):


<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.1.7.RELEASE</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.1</version>
    <scope>test</scope>
</dependency>
复制代码


假设表设计如下:


CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;
USE `delayTask`;
CREATE TABLE `t_order_message`
(
    id           BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
    order_id     VARCHAR(50) NOT NULL COMMENT '订单ID',
    create_time  DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间',
    edit_time    DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间',
    retry_times  TINYINT     NOT NULL DEFAULT 0 COMMENT '重试次数',
    order_status TINYINT     NOT NULL DEFAULT 0 COMMENT '订单状态',
    INDEX idx_order_id (order_id),
    INDEX idx_create_time (create_time)
) COMMENT '订单信息表';
# 写入两条测试数据
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');
复制代码


编写代码:


// 常量
public class OrderConstants {
    public static final int MAX_RETRY_TIMES = 5;
    public static final int PENDING = 0;
    public static final int SUCCESS = 1;
    public static final int FAIL = -1;
    public static final int LIMIT = 10;
}
// 实体
@Builder
@Data
public class OrderMessage {
    private Long id;
    private String orderId;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private Integer retryTimes;
    private Integer orderStatus;
}
// DAO
@RequiredArgsConstructor
public class OrderMessageDao {
    private final JdbcTemplate jdbcTemplate;
    private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
        List<OrderMessage> list = Lists.newArrayList();
        while (r.next()) {
            list.add(OrderMessage.builder()
                    .id(r.getLong("id"))
                    .orderId(r.getString("order_id"))
                    .createTime(r.getTimestamp("create_time").toLocalDateTime())
                    .editTime(r.getTimestamp("edit_time").toLocalDateTime())
                    .retryTimes(r.getInt("retry_times"))
                    .orderStatus(r.getInt("order_status"))
                    .build());
        }
        return list;
    };
    public List<OrderMessage> selectPendingRecords(LocalDateTime start,
                                                   LocalDateTime end,
                                                   List<Integer> statusList,
                                                   int maxRetryTimes,
                                                   int limit) {
        StringJoiner joiner = new StringJoiner(",");
        statusList.forEach(s -> joiner.add(String.valueOf(s)));
        return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
                        "AND order_status IN (?) AND retry_times < ? LIMIT ?",
                p -> {
                    p.setTimestamp(1, Timestamp.valueOf(start));
                    p.setTimestamp(2, Timestamp.valueOf(end));
                    p.setString(3, joiner.toString());
                    p.setInt(4, maxRetryTimes);
                    p.setInt(5, limit);
                }, M);
    }
    public int updateOrderStatus(Long id, int status) {
        return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
                p -> {
                    p.setInt(1, status);
                    p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
                    p.setLong(3, id);
                });
    }
}
// Service
@RequiredArgsConstructor
public class OrderMessageService {
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);
    private final OrderMessageDao orderMessageDao;
    private static final List<Integer> STATUS = Lists.newArrayList();
    static {
        STATUS.add(OrderConstants.PENDING);
        STATUS.add(OrderConstants.FAIL);
    }
    public void executeDelayJob() {
        LOGGER.info("订单处理定时任务开始执行......");
        LocalDateTime end = LocalDateTime.now();
        // 一天前
        LocalDateTime start = end.minusDays(1);
        List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
        if (!list.isEmpty()) {
            for (OrderMessage m : list) {
                LOGGER.info("处理订单[{}],状态由{}更新为{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
                // 这里其实可以优化为批量更新
                orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
            }
        }
        LOGGER.info("订单处理定时任务开始完毕......");
    }
}
// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
        service.executeDelayJob();
    }
    public static void main(String[] args) throws Exception {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
        config.setDriverClassName(Driver.class.getName());
        config.setUsername("root");
        config.setPassword("root");
        HikariDataSource dataSource = new HikariDataSource(config);
        OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
        OrderMessageService service = new OrderMessageService(orderMessageDao);
        // 内存模式的调度器
        StdSchedulerFactory factory = new StdSchedulerFactory();
        Scheduler scheduler = factory.getScheduler();
        // 这里没有用到IOC容器,直接用Quartz数据集合传递服务引用
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("orderMessageService", service);
        // 新建Job
        JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
                .withIdentity("orderMessageDelayJob", "delayJob")
                .usingJobData(jobDataMap)
                .build();
        // 新建触发器,10秒执行一次
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("orderMessageDelayTrigger", "delayJob")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                .build();
        scheduler.scheduleJob(job, trigger);
        // 启动调度器
        scheduler.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}
复制代码


这个例子里面用了create_time做轮询,实际上可以添加一个调度时间schedule_time列做轮询,这样子才能更容易定制空闲时和忙碌时候的调度策略。上面的示例的运行效果如下:


11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10086],状态由0更新为1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10087],状态由0更新为1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
复制代码


RabbitMQ死信队列


使用RabbitMQ死信队列依赖于RabbitMQ的两个特性:TTLDLX

  • TTLTime To Live,消息存活时间,包括两个维度:队列消息存活时间和消息本身的存活时间。
  • DLXDead Letter Exchange,死信交换器。


画个图描述一下这两个特性:

微信截图_20220512181838.png


下面为了简单起见,TTL使用了针对队列的维度。引入RabbitMQ的Java驱动:


<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
    <scope>test</scope>
</dependency>
复制代码


代码如下:


public class DlxMain {
    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel producerChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
        // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx.key,队列名为dlx.queue
        producerChannel.exchangeDeclare("dlx.exchange", "direct");
        producerChannel.queueDeclare("dlx.queue", false, false, false, null);
        producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
        Map<String, Object> queueArgs = new HashMap<>();
        // 设置队列消息过期时间,5秒
        queueArgs.put("x-message-ttl", 5000);
        // 指定DLX相关参数
        queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
        queueArgs.put("x-dead-letter-routing-key", "dlx.key");
        // 声明业务队列
        producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("DlxConsumer");
            return thread;
        });
        // 启动消费者
        executorService.execute(() -> {
            try {
                consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            }
        });
        OrderMessage message = new OrderMessage("10086");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());
        message = new OrderMessage("10087");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());
        message = new OrderMessage("10088");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());
        Thread.sleep(Integer.MAX_VALUE);
    }
    private static class DlxConsumer extends DefaultConsumer {
        DlxConsumer(Channel channel) {
            super(channel);
        }
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            LOGGER.info("处理消息成功:{}", new String(body, StandardCharsets.UTF_8));
        }
    }
    private static class OrderMessage {
        private final String orderId;
        private final long timestamp;
        private final String description;
        OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.description = String.format("订单[%s],订单创建时间为:%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
        }
        public String getOrderId() {
            return orderId;
        }
        public long getTimestamp() {
            return timestamp;
        }
        public String getDescription() {
            return description;
        }
    }
}
复制代码


运行main()方法结果如下:


16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58
复制代码


时间轮


时间轮TimingWheel是一种高效、低延迟的调度数据结构,底层采用数组实现存储任务列表的环形队列,示意图如下:


微信截图_20220512181850.png


这里暂时不对时间轮和其实现作分析,只简单举例说明怎么使用时间轮实现延时任务。这里使用Netty提供的HashedWheelTimer,引入依赖:


<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.39.Final</version>
</dependency>
复制代码


代码如下:


public class HashedWheelTimerMain {
    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    public static void main(String[] args) throws Exception {
        AtomicInteger counter = new AtomicInteger();
        ThreadFactory factory = r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
            return thread;
        };
        // tickDuration - 每tick一次的时间间隔, 每tick一次就会到达下一个槽位
        // unit - tickDuration的时间单位
        // ticksPerWhee - 时间轮中的槽位数
        Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
        TimerTask timerTask = new DefaultTimerTask("10086");
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10087");
        timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10088");
        timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
        Thread.sleep(Integer.MAX_VALUE);
    }
    private static class DefaultTimerTask implements TimerTask {
        private final String orderId;
        private final long timestamp;
        public DefaultTimerTask(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
        }
        @Override
        public void run(Timeout timeout) throws Exception {
            System.out.println(String.format("任务执行时间:%s,订单创建时间:%s,订单ID:%s",
                    LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
        }
    }
}
复制代码


运行结果:


任务执行时间:2019-08-20 17:19:49.310,订单创建时间:2019-08-20 17:19:43.294,订单ID:10086
任务执行时间:2019-08-20 17:19:54.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10087
任务执行时间:2019-08-20 17:19:59.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10088
复制代码


一般来说,任务执行的时候应该使用另外的业务线程池,以免阻塞时间轮本身的运动。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
4月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
85 6
|
10月前
|
存储 监控 负载均衡
保证Redis的高可用性是一个涉及多个层面的任务,主要包括数据持久化、复制与故障转移、集群化部署等方面
【5月更文挑战第15天】保证Redis高可用性涉及数据持久化、复制与故障转移、集群化及优化策略。RDB和AOF是数据持久化方法,哨兵模式确保故障自动恢复。Redis Cluster实现分布式部署,提高负载均衡和容错性。其他措施包括身份认证、多线程、数据压缩和监控报警,以增强安全性和稳定性。通过综合配置与监控,可确保Redis服务的高效、可靠运行。
253 2
|
5月前
|
NoSQL Java API
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
在40岁老架构师尼恩的读者交流群中,近期有小伙伴在面试一线互联网企业时遇到了关于Redis分布式锁过期及自动续期的问题。尼恩对此进行了系统化的梳理,介绍了两种核心解决方案:一是通过增加版本号实现乐观锁,二是利用watch dog自动续期机制。后者通过后台线程定期检查锁的状态并在必要时延长锁的过期时间,确保锁不会因超时而意外释放。尼恩还分享了详细的代码实现和原理分析,帮助读者深入理解并掌握这些技术点,以便在面试中自信应对相关问题。更多技术细节和面试准备资料可在尼恩的技术文章和《尼恩Java面试宝典》中获取。
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
|
10月前
|
缓存 NoSQL Java
面试官:Redis如何实现延迟任务?
延迟任务是计划任务,用于在未来特定时间执行。常见应用场景包括定时通知、异步处理、缓存管理、计划任务、订单处理、重试机制、提醒和数据采集。Redis虽无内置延迟任务功能,但可通过过期键通知、ZSet或Redisson实现。然而,这种方法精度有限,稳定性较差,适合轻量级需求。Redisson的RDelayedQueue提供更简单的延迟队列实现。
490 9
|
10月前
|
存储 缓存 NoSQL
Redis实现延迟任务的几种方案
Redis实现延迟任务的几种方案
|
10月前
|
存储 NoSQL Java
Redis 实现延迟任务的深度解析
【4月更文挑战第17天】
290 0
|
10月前
|
监控 NoSQL 测试技术
python使用Flask,Redis和Celery的异步任务
python使用Flask,Redis和Celery的异步任务
|
4天前
|
缓存 NoSQL Java
Redis应用—8.相关的缓存框架
本文介绍了Ehcache和Guava Cache两个缓存框架及其使用方法,以及如何自定义缓存。主要内容包括:Ehcache缓存框架、Guava Cache缓存框架、自定义缓存。总结:Ehcache适合用作本地缓存或与Redis结合使用,Guava Cache则提供了更灵活的缓存管理和更高的并发性能。自定义缓存可以根据具体需求选择不同的数据结构和引用类型来实现特定的缓存策略。
Redis应用—8.相关的缓存框架

热门文章

最新文章