使用Redis实现延时任务(一)(下)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。

选用的方案实现过程



最终选用了基于Redis的有序集合Sorted SetQuartz短轮询进行实现。具体方案是:


  1. 订单创建的时候,订单ID和当前时间戳分别作为Sorted Set的member和score添加到订单队列Sorted Set中。
  2. 订单创建的时候,订单ID和推送内容JSON字符串分别作为field和value添加到订单队列内容Hash中。
  3. 第1步和第2步操作的时候用Lua脚本保证原子性。
  4. 使用一个异步线程通过Sorted Set的命令ZREVRANGEBYSCORE弹出指定数量的订单ID对应的订单队列内容Hash中的订单推送内容数据进行处理。


对于第4点处理有两种方案:


  • 方案一:弹出订单内容数据的同时进行数据删除,也就是ZREVRANGEBYSCOREZREMHDEL命令要在同一个Lua脚本中执行,这样的话Lua脚本的编写难度大,并且由于弹出数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
  • 方案二:弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列Sorted Set和订单队列内容Hash中对应的数据,这样的话需要控制并发,有重复执行的可能性。


最终暂时选用了方案一,也就是从Sorted Set弹出订单ID并且从Hash中获取完推送数据之后马上删除这两个集合中对应的数据。方案的流程图大概是这样:


微信截图_20220512181903.png


这里先详细说明一下用到的Redis命令。


Sorted Set相关命令


  • ZADD命令 - 将一个或多个成员元素及其分数值加入到有序集当中。

ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN



  • ZREVRANGEBYSCORE命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。

ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

  • max:分数区间 - 最大分数。
  • min:分数区间 - 最小分数。
  • WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。
  • LIMIT:可选参数,offset和count原理和MySQLLIMIT offset,size一致,如果不指定此参数则返回整个集合的数据。



  • ZREM命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

ZREM key member [member ...]


Hash相关命令


  • HMSET命令 - 同时将多个field-value(字段-值)对设置到哈希表中。

HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN



  • HDEL命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。

HDEL KEY_NAME FIELD1.. FIELDN


Lua相关


  • 加载Lua脚本并且返回脚本的SHA-1字符串:SCRIPT LOAD script
  • 执行已经加载的Lua脚本:EVALSHA sha1 numkeys key [key ...] arg [arg ...]
  • unpack函数可以把table类型的参数转化为可变参数,不过需要注意的是unpack函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见Stackoverflow的提问table.unpack() only returns the first element


PS:如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。


引入依赖:


<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.7.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>5.1.9.RELEASE</version>
    </dependency> 
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.8</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.59</version>
    </dependency>       
</dependencies>
复制代码


编写Lua脚本/lua/enqueue.lua/lua/dequeue.lua


-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil
-- /lua/dequeue.lua
-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil
复制代码


编写核心API代码:


// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {
    private JedisPool jedisPool;
    @Override
    public void afterPropertiesSet() throws Exception {
        jedisPool = new JedisPool();
    }
    public Jedis provide(){
        return jedisPool.getResource();
    }
}
// OrderMessage
@Data
public class OrderMessage {
    private String orderId;
    private BigDecimal amount;
    private Long userId;
}
// 延迟队列接口
public interface OrderDelayQueue {
    void enqueue(OrderMessage message);
    List<OrderMessage> dequeue(String min, String max, String offset, String limit);
    List<OrderMessage> dequeue();
    String enqueueSha();
    String dequeueSha();
}
// 延迟队列实现类
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
    private static final List<String> KEYS = Lists.newArrayList();
    private final JedisProvider jedisProvider;
    static {
        KEYS.add(ORDER_QUEUE);
        KEYS.add(ORDER_DETAIL_QUEUE);
    }
    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
        }
    }
    @Override
    public List<OrderMessage> dequeue() {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
    }
    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }
    @Override
    public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
    }
    @Override
    public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }
    private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha);
        }
    }
    public static void main(String[] as) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        JedisProvider jedisProvider = new JedisProvider();
        jedisProvider.afterPropertiesSet();
        RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
        queue.afterPropertiesSet();
        // 写入测试数据
        OrderMessage message = new OrderMessage();
        message.setAmount(BigDecimal.valueOf(10086));
        message.setOrderId("ORDER_ID_10086");
        message.setUserId(10086L);
        message.setTimestamp(LocalDateTime.now().format(f));
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        // 测试需要,score设置为30分钟之前
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
        }
        List<OrderMessage> dequeue = queue.dequeue();
        System.out.println(dequeue);
    }
}
复制代码


这里先执行一次main()方法验证一下延迟队列是否生效:


[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]
复制代码


确定延迟队列的代码没有问题,接着编写一个QuartzJob类型的消费者OrderMessageConsumer


@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {
    private static final AtomicInteger COUNTER = new AtomicInteger();
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    @Autowired
    private OrderDelayQueue orderDelayQueue;
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        LOGGER.info("订单消息处理定时任务开始执行......");
        List<OrderMessage> messages = orderDelayQueue.dequeue();
        if (!messages.isEmpty()) {
            // 简单的列表等分放到线程池中执行
            List<List<OrderMessage>> partition = Lists.partition(messages, 2);
            int size = partition.size();
            final CountDownLatch latch = new CountDownLatch(size);
            for (List<OrderMessage> p : partition) {
                BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
            }
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        stopWatch.stop();
        LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis());
    }
    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {
        private final List<OrderMessage> messages;
        private final CountDownLatch latch;
        @Override
        public void run() {
            try {
                // 实际上这里应该单条捕获异常
                for (OrderMessage message : messages) {
                    LOGGER.info("处理订单信息,内容:{}", message);
                }
            } finally {
                latch.countDown();
            }
        }
    }
}      
复制代码


上面的消费者设计的时候需要有以下考量:


  • 使用@DisallowConcurrentExecution注解不允许Job并发执行,其实多个Job并发执行意义不大,因为我们采用的是短间隔的轮询,而Redis是单线程处理命令,在客户端做多线程其实效果不佳。
  • 线程池BUSINESS_WORKER_POOL的线程容量或者队列应该综合LIMIT值、等分订单信息列表中使用的size值以及ConsumeTask里面具体的执行时间进行考虑,这里只是为了方便使用了固定容量的线程池。
  • ConsumeTask中应该对每一条订单信息的处理单独捕获异常和吞并异常,或者把处理单个订单信息的逻辑封装成一个不抛出异常的方法。


其他Quartz相关的代码:


// Quartz配置类
@Configuration
public class QuartzAutoConfiguration {
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setAutoStartup(true);
        factory.setJobFactory(quartzAutowiredJobFactory);
        return factory;
    }
    @Bean
    public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
        return new QuartzAutowiredJobFactory();
    }
    public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {
        private AutowireCapableBeanFactory autowireCapableBeanFactory;
        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }
        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            // 这里利用AutowireCapableBeanFactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例
            autowireCapableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
}
复制代码


这里暂时使用了内存态的RAMJobStore去存放任务和触发器的相关信息,如果在生产环境最好替换成基于MySQL也就是JobStoreTX进行集群化,最后是启动函数和CommandLineRunner的实现:


@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {
    @Autowired
    private Scheduler scheduler;
    @Autowired
    private JedisProvider jedisProvider;
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Override
    public void run(String... args) throws Exception {
        // 准备一些测试数据
        prepareOrderMessageData();
        JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
                .withIdentity("OrderMessageConsumer", "DelayTask")
                .build();
        // 触发器5秒触发一次
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("OrderMessageConsumerTrigger", "DelayTask")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
                .build();
        scheduler.scheduleJob(job, trigger);
    }
    private void prepareOrderMessageData() throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            // 这里暂时不使用Lua
            Map<String, Double> map = Maps.newHashMap();
            Map<String, String> hash = Maps.newHashMap();
            for (OrderMessage message : messages) {
                // 故意把score设计成30分钟前
                map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                hash.put(message.getOrderId(), JSON.toJSONString(message));
            }
            jedis.zadd("ORDER_QUEUE", map);
            jedis.hmset("ORDER_DETAIL_QUEUE", hash);
        }
    }
}
复制代码


输出结果如下:


2019-08-21 22:45:59.518  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:22 ms......
2019-08-21 22:46:04.515  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:1 ms......
......
复制代码


首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:


  • 切换JobStoreJDBC模式,Quartz官方有完整教程,或者看笔者之前翻译的Quartz文档。
  • 需要监控或者收集任务的执行状态,添加预警等等。


这里其实有一个性能隐患,命令ZREVRANGEBYSCORE的时间复杂度可以视为为O(N)N是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set(ORDER_QUEUE)中,所以在一直有新增数据的时候,dequeue脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案。


小结



这篇文章主要从一个实际生产案例的仿真例子入手,分析了当前延时任务的一些实现方案,还基于RedisQuartz给出了一个完整的示例。当前的示例只是处于可运行的状态,有些问题尚未解决。下一篇文章会着眼于解决两个方面的问题:

  1. 分片。
  2. 监控。


还有一点,架构是基于业务形态演进出来的,很多东西需要结合场景进行方案设计和改进,思路仅供参考,切勿照搬代码


附件




相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
19天前
|
存储 缓存 NoSQL
Redis实现延迟任务的几种方案
Redis实现延迟任务的几种方案
|
22天前
|
存储 NoSQL Java
Redis 实现延迟任务的深度解析
【4月更文挑战第17天】
131 0
|
27天前
|
监控 NoSQL 测试技术
python使用Flask,Redis和Celery的异步任务
python使用Flask,Redis和Celery的异步任务
|
1月前
|
缓存 NoSQL Java
面试官:Redis如何实现延迟任务?
延迟任务是计划任务,用于在未来特定时间执行。常见应用场景包括定时通知、异步处理、缓存管理、计划任务、订单处理、重试机制、提醒和数据采集。Redis虽无内置延迟任务功能,但可通过过期键通知、ZSet或Redisson实现。然而,这种方法精度有限,稳定性较差,适合轻量级需求。Redisson的RDelayedQueue提供更简单的延迟队列实现。
365 9
|
3月前
|
存储 NoSQL API
【小小思考】Redis实现去重任务队列
【2月更文挑战第1天】思考一下如何用Redis实现去重的任务队列,主要有List 、List + Set/Hash/Bloom Filter、ZSet、Lua和开源库等方式。
93 1
|
3月前
|
缓存 NoSQL Java
一次访问Redis延时高问题排查与总结
作者抽丝剥茧的记录了一次访问Redis延时高问题的排查和总结。
433 1
|
8月前
|
Arthas NoSQL Java
一次访问Redis延时高问题排查与总结(2)
本文是一次访问Redis延时高问题排查与总结的续篇,主要讲述了当时没有发现的一些问题和解决方案。
46969 22
|
8月前
|
NoSQL 安全 容灾
1分钟实现Redis数据迁移任务
NineData 基于全量复制、增量日志复制技术,提供了高效、安全可靠的 Redis 不停机迁移方案。当然,除了 Redis,NineData 已经支持数十种常见数据库的迁移复制,实现数据库迁移、数据容灾、数据双活、数据仓库实时集成等业务场景。同时,除了 SAAS 模式外,还提供了企业专属集群模式,满足企业最高的数据安全合规要求。
171 0
|
4天前
|
存储 监控 NoSQL
Redis哨兵&分片集群
Redis哨兵&分片集群
8 0