异步结果通知实现——基于Redis实现,我这操作很可以

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 前段时间,我在内存中实现了一个简单异步通知框架。但由于没有持久化功能,应用重启就会导致数据丢失,且不支持分布式和集群。今天这篇笔记,引入了 Redis 来解决这些问题,以下是几点理由:数据结构丰富,支持 List、Sorted Set 等具有持久化功能,消息的可靠性能得到保证高可用性,支持单机、主从、集群部署项目中已使用,接入成本更低基于 Redis 实现延时队列也有几种方法,展开详细讲讲。

前段时间,我在内存中实现了一个简单异步通知框架。但由于没有持久化功能,应用重启就会导致数据丢失,且不支持分布式和集群。今天这篇笔记,引入了 Redis 来解决这些问题,以下是几点理由:

  • 数据结构丰富,支持 List、Sorted Set 等
  • 具有持久化功能,消息的可靠性能得到保证
  • 高可用性,支持单机、主从、集群部署
  • 项目中已使用,接入成本更低

基于 Redis 实现延时队列也有几种方法,展开详细讲讲。

基于键事件通知实现

Redis 2.8.0 版本以后就具有了 键事件通知(注,还有个键空间通知,注意区别),基于 Pub/Sub 发布订阅实现,详见 官网。而我们正好可以利用这个特性,实现异步通知的延迟功能,数据流转如下:

网络异常,图片无法展示
|

大概逻辑:当首次通知、或通知失败时,设置(重新设置)在 Redis 对应的 Key 的过期时间,Redis 会监听过期事件,发生事件时通知订阅者,订阅者接收到事件,做逻辑处理。下面看具体的实现。

首先,修改 Redis 端配置打开功能。由于该功能会消耗一些 CPU 性能,所以在配置文件中是 默认关闭 的。Ex表示打开 键过期事件通知,每当有过期键被删除时发送,订阅者能收到 接收到被执行事件的键的名字

notify-keyspace-events Ex

其次,想要在 SpringBoot 中,订阅到 Redis 的事件,也需要两个步骤:

1、继承
org.springframework.data.redis.listener.adapter.MessageListenerAdapter 类,创建自己的监听器

@Component
public class OrderExpireEventListener extends MessageListenerAdapter {
    @Override
    public void onMessage(Message message, byte[] pattern) {
          byte[] body = message.getBody();
          String msg = redisWrapper.getRedisTemplate().getStringSerializer().deserialize(body);
          // do something...
          // 假如通知失败,需要重新计算下次通知时间,设置 Redis
          // 至于数据类型,String 即可
    }
}

2、将创建的监听器,注册(委托设计模式)给
RedisMessageListenerContainer

@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory, 
OrderExpireEventListener adapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(factory);
    container.addMessageListener(adapter, new PatternTopic("__keyevent@0__:expired"));
    return container;
}

这里有个点需要注意下,那就是 Redis 的键设计。

代码中的 __keyevent@0__:expired 频道匹配意味着,编号为 0 的库中所有键过期时间都会被订阅到。而这个 Redis 可能不单单只有这个业务在使用,有可能存在其他的业务也在使用。总不可能来个任意的键都会需要去做过期处理。最好是有个通用的设计规则,对 Key 的含义分割。比如:产品固定前缀:业务:业务属性:业务唯一标识

app1:trans:notice:1615283234

代表:系统名为 app1 的 在交易模块 的 订单号为 1615283234 的通知业务的消息。当监听器解析 Key 失败时则说明是其他的键过期,不做处理。一旦解析成功,则对消息进行路由分发。

键搞定了,值就看业务情况而定。如果是通知的话,必须带上当前是第几次通知,根据这个再加上策略才能算出下次通知时间(该键的过期时间)。

一般简单的方法都存在多少的缺陷,这种方式也不例外。引用 Redis 官网的一段话:

Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost

意思是说:Redis 目前发布订阅基于 发送即忘 策略,且没有 ACK 机制,意味着客户端重启掉线期间,消息会丢失。加上 Pub/Sub 消息 没有持久化机制,假如当订阅客户端由于网络原因没收到,想再次重试,这是没法实现的。

假如此时我还想跟内存队列那样子能够 对消息的延迟时间进行自动排序,该如何实现呢?除此之外,Pub/Sub 是广播机制,假如存在多个订阅者,那么就会 同时收到键过期的消息,此时又该如何处理 消息竞争 问题?

基于 Sorted Set 实现

这时候我们要引入 Redis 的 Sorted Set 数据结构。关于这个数据结构简单来说是 支持排序的 Set,靠的是与之关联的浮点值,称为 score 来实现的。值得注意的是,这个排序并不是放进去的时候排,是拿出来的时候(联想到 性能 问题,后面有讲)。这里引用一段官网的话:

Moreover, elements in a sorted sets are taken in order (so they are not ordered on request, order is a peculiarity of the data structure used to represent sorted sets).

所以我们只需要将消息延迟执行的时间戳作为分数值,就能解决上文所说的排序问题,当然由于该结构是 Redis 的基本功能,自然也支持持久化,也就是解决了消息丢失问题。

大概设计如下:

网络异常,图片无法展示
|

首先看看,消费者线程该如何实现(SpringBoot 环境下)

@Slf4j
@Component
public class ConsumerTask {
    @Autowired
    RedisTemplate<String, Object> redisTemplate;
  // Sorted Set 队列键 
    private static String KEY = "TEST:ZSET";
    @Scheduled(cron = "0/1 * * * * ?")
    public void run() {
        try {
            this.doRun();
        } catch (Exception e) {
            log.error("消费异常", e);
        }
    }
    private void doRun() {
      // zrange 分数从小到大 zrevrange 分数从大到小
        // 拿出最新的待处理消息
        Set<ZSetOperations.TypedTuple<Object>> tuples = 
        redisTemplate.opsForZSet().rangeWithScores(KEY, 0, 0);
        if (CollectionUtils.isEmpty(tuples)) {
            log.info("队列无数据");
            return ;
        }
        ZSetOperations.TypedTuple<Object> typedTuple = tuples.iterator().next();
        if (typedTuple == null) {
            log.info("队列无数据");
            return ;
        }
        Double score = typedTuple.getScore();
        Object value = typedTuple.getValue();
        if (System.currentTimeMillis() < score) {
            log.info("未到执行时间...");
            return ;
        }
        Long zrem = redisTemplate.opsForZSet().remove(KEY, value);
        if (Long.compare(1L, zrem) == 0) {
            log.info("删除数据成功,开始处理,数据:{}", value.toString());
            // do someting...
            // 假如通知失败,需要重新计算通知时间(score 值)并在 Redis 设置(ZADD)该消息
        }
        else {
            log.info("被其他的消费端抢占,不处理...");
        }
    }
}

跟之前的 推模式 相比,这次采用的是 拉模式,尽管在多个消费端可能同时拿到同一个消息,不过这里通过 Long zrem = redisTemplate.opsForZSet().remove(KEY, value) 这方法,利用了 rem 命令的原子性 解决了竞争问题,也就是说只会有一个客户端删除成功。

仔细观察的话,可以看到我们拿到的时间戳是 Long 类型的,但是 Spring 提供的 Sorted Set 操作 api 参数是 Double 类型

org.springframework.data.redis.core.ZSetOperations#add(K, V, double)
org.springframework.data.redis.core.ZSetOperations#rangeByScore(K, double, double)

那会不会有精确丢失问题?所以输出看下最大最小值

System.out.println(Long.MAX_VALUE); // 2 的 64 次方-1,19 个数位
System.out.println(Long.MIN_VALUE); // 负的 2 的 64 次方 
System.out.println(Double.MAX_VALUE); // 2 的 1024 次方 -1,308 个数位
System.out.println(Double.MIN_VALUE); // 2 的 -1074次方

可以看到 Double 最大值远远大于 Long 类型,加上时间戳不会有负数,所以可以放心转换。

在这里不演示生产者代码,过于简单,就是调用 zadd 命令而已。这里也需要注意,如果是异步通知场景 zadd 的值必须带上这是第几次通知,就如前面的方案一样。

到此为止,第一种方案存在的问题在第二种方案全部解决了。下面看一种网上的比较多的实现方式。

基于 Sorted Set、List 实现

跟上一种相比多了一个 List 数据结构。先来看下加入 List 之后的整个设计图

网络异常,图片无法展示
|

不得不说刚开始看见这种方案时,是存在疑惑的。因为上面的 Sorted Set 已经实现了功能,为什么要引入 List 数据结构增加系统的复杂度?唯一能看到的好处就是 List 数据结构提供了 阻塞 操作?经过与同事讨论后,得出下面几点结论:

  • 客户端拉取消息 控制并发的步骤减少。当使用 List 时,只需要调用一个命令就可以解决消息竞争问题,而使用 Sorted Set 则需要使用 zrange 和 zrem 两条命令来实现,相比之下,多交互一次网络,且实现更复杂。
  • 客户端拉取消息的方式增多,同时,队列提供 阻塞式 访问,同样也 减少 了客户端由于无限循环造成的 CPU 浪费
  • 队列 pop 操作比 zrange 操作对 Redis 来说性能开销更小,在这种频繁拉取的情况下更加合适。

这里需要注意的一点是,搬运操作有多个命令一起完成,如下伪代码:

// 1、从 Sorted Set 中拿出 score 值在 前五秒 到 目前(包含现在)的所有元素
Date now = new Date();
Date fiveSecondBefore = DateUtils.addSeconds(now, -5);
Set<Object> objects = redisTemplate.opsForZSet()
.rangeByScore("Sorted Set:Key", fiveSecondBefore.getTime(), now.getTime());
if (CollectionUtils.isEmpty(objects)) {
  return ;
}
// 2、将这些元素从 Sorted Set 中删除
Long removeResult = redisTemplate.opsForZSet().remove("Sorted Set:Key", objects);
if (Long.compare(removeResult, objects.size()) != 0) {
  return ;
}
// 3、将这些元素放进 List
Long result = redisTemplate.opsForList().leftPushAll("List:Key", objects);

rangeByScore、remove、leftPushAll 这几个操作不具有原子性,可能在中途发生异常、宕机等情况,导致在搬运过程中丢失或重复搬运。

好在 Redis 提供了执行 lua 脚本功能,会保证同一脚本以原子性(atomic) 的方式执行,所以我们只需要原子性操作的多个步骤整合在自定义 lua 脚本中即可,如下:

local list_key = KEYS[1];
local sorted_set_key = KEYS[2];
local now = ARGV[1];
local sorted_set_size = redis.call('ZCARD', sorted_set_key)
if (tonumber(sorted_set_size) <= 0) then
    return
end
local members = redis.call('ZRANGEBYSCORE', sorted_set_key, 0, tonumber(now));
if (next(members) == nil) then
    return
end
for key,value in ipairs(members)
do
    local zscore = redis.call('ZSCORE',sorted_set_key,value);
    if (tonumber(now) < tonumber(zscore)) then
        return zscore;
    end
    redis.call('ZREM', sorted_set_key, value);
    redis.call('RPUSH', list_key, value);
end
local topmember = redis.call('ZRANGE', sorted_set_key, 0, 0);
local nextvalue = next(topmember);
if (nextvalue == nil) then
    return
end
for k,v in ipairs(topmember)
do
    return redis.call('ZSCORE', sorted_set_key, v);
end

下面是 SpringBoot 定时调用该 lua 脚本进行搬运的示例代码:

@Scheduled(cron = "0/1 * * * * ?")
public void run4() {
    ClassPathResource resource = new ClassPathResource("sorted_set_to_list.lua");
    String luaScript = FileUtils.readFileToString(resource.getFile());
    DefaultRedisScript<String> redisScript = new DefaultRedisScript<>(luaScript, String.class);
    //
    List<String> keys = Lists.newArrayList("TEST:LIST", "TEST:ZSET");
    String now = String.valueOf(System.currentTimeMillis());
    // 注意这里的序列化器,需要换成 StringSerializer
    // 替换的默认的 Jackson2JsonRedisSerializer
    String executeResult = redisTemplate.execute(redisScript, redisTemplate.getStringSerializer(),
            redisTemplate.getStringSerializer(), keys, now);
    log.info("lua 脚本执行结果:{}", executeResult);
}

最后再来看看消费者该如何实现

@Component
@Slf4j
public class ListConsumer implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        Executors.newSingleThreadExecutor().submit(new PopEventRunner());
    }
    private static class PopEventRunner implements Runnable {
        @Override
        public void run() {
            RedisTemplate<String, Object> redisTemplate  = (RedisTemplate<String, Object>) SpringUtil.getBean3("redisTemplate");
            while (true) {
                try {
                    Object leftPop = redisTemplate.opsForList().leftPop("TEST:LIST", Integer.MAX_VALUE, TimeUnit.SECONDS);
                    if (leftPop == null) {
                        continue ;
                    }
                    // do something...
                    // 当通知失败时,重新计算通知时间并设置(ZADD)Redis
                } catch (Exception e) {
                    log.error("监听异常", e);
                    sleep(5); // 发生异常睡五秒
                }
            }
        }
    }
}

监听容器的刷新事件,创建监听单线程,无限循环阻塞监听队列。相对于前一种实现方案,该方案确实更加的贴合。但仍有优化的余地,比如:

  • 搬运线程的时机,目前频率为 1 秒,所以极端情况会有 1 秒时间的延迟。且在 Sorted Set 为空情况下,对 CPU 是一种浪费。

小结

相对前一篇内存实现,Redis 这种方式更加的可靠,且在允许一点时间的误差和牺牲一点消息可靠性下,不失为一种 性价比高 的选择。假如当前景就是不允许有这些损失,那还有什么解决方案吗?到时候我们再来讲终极杀招,使用 RabbitMQ 来实现。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
52 6
|
5月前
|
NoSQL Linux Redis
Redis性能优化问题之想确认Redis延迟变大是否因为fork耗时导致的,如何解决
Redis性能优化问题之想确认Redis延迟变大是否因为fork耗时导致的,如何解决
|
6月前
|
缓存 NoSQL Redis
redis管道操作(节省网络IO开销)
pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的响应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义,管道中前面命令失败,后面命令不会有影响,继续执行。
58 1
|
6月前
|
NoSQL Java Redis
如何在 Java 中操作这些 Redis 数据结构的基本方法
如何在 Java 中操作这些 Redis 数据结构的基本方法
45 2
|
6月前
|
NoSQL 数据管理 关系型数据库
数据管理DMS操作报错合集之控制台查看Redis时出现乱码是什么导致的
数据管理DMS(Data Management Service)是阿里云提供的数据库管理和运维服务,它支持多种数据库类型,包括RDS、PolarDB、MongoDB等。在使用DMS进行数据库操作时,可能会遇到各种报错情况。以下是一些常见的DMS操作报错及其可能的原因与解决措施的合集。
106 2
|
6月前
|
DataWorks NoSQL Java
DataWorks操作报错合集之数据集成使用公共数据集成资源组写入到redis数据源(使用的是VPC连接),提示以下错误:request action:[InnerVpcGrantVpcInstanceAccessToApp], message:[InvalidInstanceId.怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
存储 NoSQL Go
轻松上手,使用Go语言操作Redis数据库
轻松上手,使用Go语言操作Redis数据库
|
6月前
|
NoSQL Redis
加速 Redis 操作:掌握管道技术提升性能与效率
加速 Redis 操作:掌握管道技术提升性能与效率
|
6月前
|
NoSQL Java Redis
【Redis】 Java操作客户端命令——列表操作与哈希操作
【Redis】 Java操作客户端命令——列表操作与哈希操作