使用Redis实现延时任务(二)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 前一篇文章通过Redis的有序集合Sorted Set和调度框架Quartz实例一版简单的延时任务,但是有两个相对重要的问题没有解决:分片、监控。这篇文章的内容就是要完善这两个方面的功能。前置文章:使用Redis实现延时任务(一)。

前提



前一篇文章通过Redis的有序集合Sorted Set和调度框架Quartz实例一版简单的延时任务,但是有两个相对重要的问题没有解决:

  1. 分片。
  2. 监控。

这篇文章的内容就是要完善这两个方面的功能。前置文章:使用Redis实现延时任务(一)


为什么需要分片



这里重新贴一下查询脚本dequeue.lua的内容:


-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil
复制代码


这个脚本一共用到了四个命令ZREVRANGEBYSCOREZREMHMGETHDELTYPE命令的时间复杂度可以忽略):


命令 时间复杂度 参数说明
ZREVRANGEBYSCORE O(log(N)+M) N是有序集合中的元素总数,M是返回的元素的数量
ZREM O(M*log(N)) N是有序集合中的元素总数,M是成功移除的元素的数量
HMGET O(L) L是成功返回的域的数量
HDEL O(L) L是要删除的域的数量


接下来需要结合场景和具体参数分析,假如在生产环境,有序集合的元素总量维持在10000每小时(也就是说业务量是每小时下单1万笔),由于查询Sorted SetHash的数据同时做了删除,那么30分钟内常驻在这两个集合中的数据有5000条,也就是上面表中的N = 5000。假设我们初步定义查询的LIMIT值为100,也就是上面的M值为100,假设Redis中每个操作单元的耗时简单认为是T,那么分析一下5000条数据处理的耗时:


序号 集合基数 ZREVRANGEBYSCORE ZREM HMGET HDEL
1 5000 log(5000T) + 100T log(5000T) * 100 100T 100T
2 4900 log(4900T) + 100T log(4900T) * 100 100T 100T
3 4800 log(4800T) + 100T log(4800T) * 100 100T 100T
... ... ... ... ... ...


理论上,脚本用到的四个命令中,ZREM命令的耗时是最大的,而ZREVRANGEBYSCOREZREM的时间复杂度函数都是M * log(N),因此控制集合元素基数N对于降低Lua脚本运行的耗时是有一定帮助的。


分片



上面分析了dequeue.lua的时间复杂度,准备好的分片方案有两个:

  • 方案一:单Redis实例,对Sorted SetHash两个集合的数据进行分片。
  • 方案二:基于多个Redis实例(可以是哨兵或者集群),实施方案一的分片操作。


为了简单起见,后面的例子中分片的数量(shardingCount)设计为2,生产中分片数量应该根据实际情况定制。预设使用长整型的用户ID字段userId取模进行分片,假定测试数据中的userId是均匀分布的。


通用实体:


@Data
public class OrderMessage {
    private String orderId;
    private BigDecimal amount;
    private Long userId;
    private String timestamp;
}
复制代码


延迟队列接口:


public interface OrderDelayQueue {
    void enqueue(OrderMessage message);
    List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index);
    List<OrderMessage> dequeue(int index);
    String enqueueSha();
    String dequeueSha();
}
复制代码


单Redis实例分片


Redis实例分片比较简单,示意图如下:


微信截图_20220512183635.png


编写队列实现代码如下(部分参数写死,仅供参考,切勿照搬到生产中):


@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    /**
     * 分片数量
     */
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_";
    private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
    private final JedisProvider jedisProvider;
    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args);
        }
    }
    @Override
    public List<OrderMessage> dequeue(int index) {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }
    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }
    @Override
    public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
    }
    @Override
    public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }
    private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha);
        }
    }
}
复制代码


消费者定时任务的实现如下:


DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    /**
     * 初始化业务线程池
     */
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });
    @Autowired
    private OrderDelayQueue orderDelayQueue;
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 这里为了简单起见,分片的下标暂时使用Quartz的任务执行上下文存放
        int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex");
        LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            final CountDownLatch latch = new CountDownLatch(1);
            BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
    }
    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {
        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final int shardingIndex;
        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模拟耗时
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}
复制代码


启动定时任务和写入测试数据的CommandLineRunner实现如下:


@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {
    @Autowired
    private Scheduler scheduler;
    @Autowired
    private JedisProvider jedisProvider;
    @Override
    public void run(String... args) throws Exception {
        int shardingCount = 2;
        // 准备测试数据
        prepareOrderMessageData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }
    private void prepareOrderMessageData(int shardingCount) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            for (OrderMessage message : messages) {
                // 30分钟前
                Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
                long index = message.getUserId() % shardingCount;
                jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message));
                jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId());
            }
        }
    }
    private List<ConsumerTask> prepareConsumerTasks(int shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (int i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }
    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {
        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}
复制代码


启动应用,输出如下:


2019-08-28 00:13:20.648  INFO 50248 --- [           main] c.t.s.s.NoneJdbcSpringApplication        : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109)
2019-08-28 00:13:20.780  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-08-28 00:13:20.781  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99}
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],处理订单消息,内容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],处理订单消息,内容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96}
// ... 省略大量输出
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
// ... 省略大量输出
复制代码


多Redis实例分片


Redis实例分片其实存在一个问题,就是Redis实例总是单线程处理客户端的命令,即使客户端是多个线程执行Redis命令,示意图如下:


微信截图_20220512183647.png


这种情况下,虽然通过分片降低了Lua脚本命令的复杂度,但是Redis的命令处理模型(单线程)也有可能成为另一个性能瓶颈隐患。因此,可以考虑基于多Redis实例进行分片。



微信截图_20220512183657.png


这里为了简单起见,用两个单点的Redis实例做编码示例。代码如下:


// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {
    private final Map<Long, JedisPool> pools = Maps.newConcurrentMap();
    private JedisPool defaultPool;
    @Override
    public void afterPropertiesSet() throws Exception {
        JedisPool pool = new JedisPool("localhost");
        defaultPool = pool;
        pools.put(0L, pool);
        // 这个是虚拟机上的redis实例
        pool = new JedisPool("192.168.56.200");
        pools.put(1L, pool);
    }
    public Jedis provide(Long index) {
        return pools.getOrDefault(index, defaultPool).getResource();
    }
}
// 订单消息
@Data
public class OrderMessage {
    private String orderId;
    private BigDecimal amount;
    private Long userId;
}
// 订单延时队列接口
public interface OrderDelayQueue {
    void enqueue(OrderMessage message);
    List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index);
    List<OrderMessage> dequeue(long index);
    String enqueueSha(long index);
    String dequeueSha(long index);
}
// 延时队列实现
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap();
    private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap();
    private final JedisProvider jedisProvider;
    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args);
        }
    }
    @Override
    public List<OrderMessage> dequeue(long index) {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }
    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }
    @Override
    public String enqueueSha(long index) {
        return ENQUEUE_LUA_SHA.get(index);
    }
    @Override
    public String dequeueSha(long index) {
        return DEQUEUE_LUA_SHA.get(index);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }
    private void loadLuaScript() throws Exception {
        for (long i = 0; i < SHARDING_COUNT; i++) {
            try (Jedis jedis = jedisProvider.provide(i)) {
                ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
                String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                String sha = jedis.scriptLoad(luaContent);
                ENQUEUE_LUA_SHA.put(i, sha);
                resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
                luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                sha = jedis.scriptLoad(luaContent);
                DEQUEUE_LUA_SHA.put(i, sha);
            }
        }
    }
}
// 消费者
public class OrderMessageConsumer implements Job {
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    // 初始化业务线程池
    private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });
    @Autowired
    private OrderDelayQueue orderDelayQueue;
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex");
        LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            // 这里的倒数栅栏,在线程池资源充足的前提下可以去掉
            final CountDownLatch latch = new CountDownLatch(1);
            businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
    }
    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {
        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final long shardingIndex;
        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模拟处理耗时50毫秒
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}
// 配置
@Configuration
public class QuartzConfiguration {
    @Bean
    public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() {
        return new AutowiredSupportQuartzJobFactory();
    }
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setSchedulerName("RamScheduler");
        factory.setAutoStartup(true);
        factory.setJobFactory(autowiredSupportQuartzJobFactory);
        return factory;
    }
    public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware {
        private AutowireCapableBeanFactory autowireCapableBeanFactory;
        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }
        @Override
        protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            autowireCapableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
}
// CommandLineRunner
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {
    @Autowired
    private Scheduler scheduler;
    @Autowired
    private JedisProvider jedisProvider;
    @Override
    public void run(String... args) throws Exception {
        long shardingCount = 2;
        prepareData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }
    private void prepareData(long shardingCount) {
        for (long i = 0L; i < shardingCount; i++) {
            Map<String, Double> z = Maps.newHashMap();
            Map<String, String> h = Maps.newHashMap();
            for (int k = 0; k < 100; k++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(k));
                message.setUserId((long) k);
                message.setOrderId("ORDER_ID_" + k);
                // 30 min ago
                z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                h.put(message.getOrderId(), JSON.toJSONString(message));
            }
            Jedis jedis = jedisProvider.provide(i);
            jedis.hmset("ORDER_DETAIL_QUEUE", h);
            jedis.zadd("ORDER_QUEUE", z);
        }
    }
    private List<ConsumerTask> prepareConsumerTasks(long shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (long i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }
    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {
        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}
复制代码


新增一个启动函数并且启动,控制台输出如下:


// ...省略大量输出
2019-09-01 14:08:27.664  INFO 13056 --- [           main] c.t.multi.NoneJdbcSpringApplication      : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352)
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
// ...省略大量输出
2019-09-01 14:08:28.239  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
2019-09-01 14:08:28.240  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
// ...省略大量输出
复制代码


生产中应该避免Redis服务单点,一般常用哨兵配合树状主从的部署方式(参考《Redis开发与运维》),2套Redis哨兵的部署示意图如下:

微信截图_20220512183710.png



需要什么监控项



我们需要相对实时地知道Redis中的延时队列集合有多少积压数据,每次出队的耗时大概是多少等等监控项参数,这样我们才能更好地知道延时队列模块是否正常运行、是否存在性能瓶颈等等。具体的监控项,需要按需定制,这里为了方便举例,只做两个监控项的监控:

  • 有序集合Sorted Set中积压的元素数量。
  • 每次调用dequeue.lua的耗时。


采用的是应用实时上报数据的方式,依赖于spring-boot-starter-actuatorPrometheusGrafana搭建的监控体系,如果并不熟悉这个体系可以看两篇前置文章:


监控



引入依赖:


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
    <version>1.2.0</version>
</dependency>
复制代码


这里选用GaugeMeter进行监控数据收集,添加监控类OrderDelayQueueMonitor


// OrderDelayQueueMonitor
@Component
public class OrderDelayQueueMonitor implements InitializingBean {
    private static final long SHARDING_COUNT = 2L;
    private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap();
    private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap();
    private ScheduledExecutorService executor;
    @Autowired
    private JedisProvider jedisProvider;
    @Override
    public void afterPropertiesSet() throws Exception {
        executor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "OrderDelayQueueMonitor");
            thread.setDaemon(true);
            return thread;
        });
        for (long i = 0L; i < SHARDING_COUNT; i++) {
            AtomicLong l = new AtomicLong();
            Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    l, AtomicLong::get);
            lua.put(i, l);
            AtomicLong r = new AtomicLong();
            Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    r, AtomicLong::get);
            remain.put(i, r);
        }
        // 每五秒上报一次集合中的剩余数据
        executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0, 5, TimeUnit.SECONDS);
    }
    public void recordRemain(Long index, long count) {
        remain.get(index).set(count);
    }
    public void recordLuaCost(Long index, long count) {
        lua.get(index).set(count);
    }
    @RequiredArgsConstructor
    private class MonitorTask implements Runnable {
        private final JedisProvider jedisProvider;
        @Override
        public void run() {
            for (long i = 0L; i < SHARDING_COUNT; i++) {
                try (Jedis jedis = jedisProvider.provide(i)) {
                    recordRemain(i, jedis.zcount("ORDER_QUEUE", "-inf", "+inf"));
                }
            }
        }
    }
}
复制代码


原来的RedisOrderDelayQueue#dequeue()进行改造:


@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    // ... 省略没有改动的代码
    private final OrderDelayQueueMonitor orderDelayQueueMonitor;
    // ... 省略没有改动的代码
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            long start = System.nanoTime();
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            long end = System.nanoTime();
            // 添加dequeue的耗时监控-单位微秒
            orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start));
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    } 
    // ... 省略没有改动的代码
}      
复制代码


其他配置这里简单说一下。

application.yaml要开放prometheus端点的访问权限:


server:
  port: 9091
management:
  endpoints:
    web:
      exposure:
        include: 'prometheus'
复制代码


Prometheus服务配置尽量减少查询的间隔时间,暂定为5秒:


# my global config
global:
  scrape_interval:     5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).
# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'
    metrics_path: '/actuator/prometheus'
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
    - targets: ['localhost:9091']
复制代码


Grafana的基本配置项如下:


出队耗时 order_delay_queue_lua_cost 分片编号-{{index}}
订单延时队列积压量 order_delay_queue_remain 分片编号-{{index}}
复制代码


最终可以在Grafana配置每5秒刷新,见效果如下:

微信截图_20220512183723.png


这里的监控项更多时候应该按需定制,说实话,监控的工作往往是最复杂和繁琐的。


小结



全文相对详细地介绍了基于Redis实现延时任务的分片和监控的具体实施过程,核心代码仅供参考,还有一些具体的细节例如PrometheusGrafana的一些应用,这里限于篇幅不会详细地展开。说实话,基于实际场景做一次中间件和架构的选型并不是一件简单的事,而且往往初期的实施并不是最大的难点,更大的难题在后面的优化以及监控。


附件




相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
48 6
|
2月前
|
NoSQL Java API
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
在40岁老架构师尼恩的读者交流群中,近期有小伙伴在面试一线互联网企业时遇到了关于Redis分布式锁过期及自动续期的问题。尼恩对此进行了系统化的梳理,介绍了两种核心解决方案:一是通过增加版本号实现乐观锁,二是利用watch dog自动续期机制。后者通过后台线程定期检查锁的状态并在必要时延长锁的过期时间,确保锁不会因超时而意外释放。尼恩还分享了详细的代码实现和原理分析,帮助读者深入理解并掌握这些技术点,以便在面试中自信应对相关问题。更多技术细节和面试准备资料可在尼恩的技术文章和《尼恩Java面试宝典》中获取。
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
|
7月前
|
存储 监控 负载均衡
保证Redis的高可用性是一个涉及多个层面的任务,主要包括数据持久化、复制与故障转移、集群化部署等方面
【5月更文挑战第15天】保证Redis高可用性涉及数据持久化、复制与故障转移、集群化及优化策略。RDB和AOF是数据持久化方法,哨兵模式确保故障自动恢复。Redis Cluster实现分布式部署,提高负载均衡和容错性。其他措施包括身份认证、多线程、数据压缩和监控报警,以增强安全性和稳定性。通过综合配置与监控,可确保Redis服务的高效、可靠运行。
238 2
|
7月前
|
缓存 NoSQL Java
面试官:Redis如何实现延迟任务?
延迟任务是计划任务,用于在未来特定时间执行。常见应用场景包括定时通知、异步处理、缓存管理、计划任务、订单处理、重试机制、提醒和数据采集。Redis虽无内置延迟任务功能,但可通过过期键通知、ZSet或Redisson实现。然而,这种方法精度有限,稳定性较差,适合轻量级需求。Redisson的RDelayedQueue提供更简单的延迟队列实现。
463 9
|
7月前
|
存储 缓存 NoSQL
Redis实现延迟任务的几种方案
Redis实现延迟任务的几种方案
|
7月前
|
存储 NoSQL Java
Redis 实现延迟任务的深度解析
【4月更文挑战第17天】
269 0
|
7月前
|
监控 NoSQL 测试技术
python使用Flask,Redis和Celery的异步任务
python使用Flask,Redis和Celery的异步任务
|
2天前
|
存储 缓存 NoSQL
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
110 85
|
2月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
78 6
下一篇
DataWorks