我是如何用 redis 分布式锁来解决线上历史业务问题的

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 近期发现,开发功能的时候发现了一个 mq 消费顺序错乱(历史遗留问题),导致业务异常的问题,看看我是如何解决的

近期发现,开发功能的时候发现了一个 mq 消费顺序错乱(历史遗留问题),导致业务异常的问题,看看我是如何解决的

问题抛出

首先,简单介绍一下情况:

线上 k8s 有多个 pod 会去消费 mq 中的消息,可是生产者发送的消息是期望一定要有序去消费,此时要表达的是,例如 生产者如果发送了 3 个通知消息,分别是

  • 1 系统已经在 / 组下面添加 a 组,你记得绑定策略 (例如 / 组绑定的是策略是:允许看视频类型的网站)
  • 2 系统已经在 /a 组下添加了 b 组, 你记得绑定策略(期望绑定的策略和他的父组策略一样)
  • 3 系统已经在 b 组下面添加 小 d 用户,你的绑定策略(期望绑定的策略和他的所在组一样)

此处,若有 3 个 pod 的分别拿到了上述 3 条消息,但是自身实际消费完毕的顺序可能是 先完成了 3 消息对应的业务逻辑,再是 2 消息 的业务逻辑,最后是 1 消息的业务逻辑

那么这个时候,小 d 用户就没有绑定上 允许看视频类型的网站 这一条策略,自然 b组 和 a 组也没有绑定上这条策略,这就和我们预期的完全不一致了

当然,实际情况对于单条单条的消息处理基本不会出现这种偏差,但是在批量处理的时候,就会出现实际业务处理顺序与期望不一致的情况,那么就是妥妥的线上问题了(小 d 上网的时候想看视频,可是一直看不了,于是就疯狂投诉。。。)

思考解决

对于这个问题如何解决呢?

我们知道,咱们使用 mq 的目的是为了做到去处理我们的异步逻辑,还能对流量进行削峰,服务间解耦

对于咱们的 A 服务,已经处理了关于添加用户的,添加组的逻辑,发送通知消息给到 B 服务的时候,B 服务自身的处理顺序,未按照既定的顺序真实按照顺序消费完毕,导致出现了业务问题

想法一

我们是期望 B 服务团队去添加批量接口,A 服务将需要通知的信息,排序好给到 B 服务,一个整包, B 服务的单个 pod 接收到这个大包,然后按照顺序处理消息即可,但是这个方式弊端比较明显

  • 当发送了多个批量大包消息的时候,B 服务如果自身处理不过来,也会导致类似的问题,无法根治
  • 需要 B 服务新增和修改的代码较多,肯定谈不下来
  • 而且对于绑定策略的服务来说,不仅仅是 B 服务,还有 C 服务,D 服务呢,他们都要改造... 这个想法就。。。

想法二

对于这一个业务,也不能去对整个架构大改,对于这些历史遗留问题,能少动就少动,兄弟们你们都懂的

于是便想出了使用 redis 分布式锁来处理,对于一个部署在 k8s 中服务的多个 pod 去抢占,谁先抢到锁,那么就谁消费mq中的消息,没有抢到锁的 pod ,那就过一会再抢

当然,对于其他类型的业务是没有影响的

如何去实现这个想法呢,我们可以模拟一下

image.png

  • 1 首先,我们设置一个 redis 的 key,例如 [服务名]_lockmq, 值的话咱们就任意设置,默认就用 服务名 做 value 吧,过期时间暂定 30 秒,有需求的可以调大
  • 2 如果设置成功,则处理成功之后的事情
  • 2.1 初始化 mq 消费者,并开启协程进行消费
  • 2.1.1 如果初始化失败,则直接返回,退到第 1 步
  • 2.2 对 redis 锁进行续期,此处咱们 10 秒续期一次
  • 如果续期失败,则直接返回,退到第 1 步
  • 3 若拿锁失败,则休息 10 秒再去拿锁

这样来处理的话,我们就可以应对多个 pod 来消费同一类消息的时候,保证同时只有一个 pod 在处理 mq 中的消息了,当然如果正在处理消息的 pod 出现了异常,对于其他 pod ,最晚会在 40 秒之后拿到锁,对于大量的消息来说,这个还是可以容忍的

对应的代码逻辑如下:

  • 简单连接 redis, redis 分布式锁的主逻辑如下
  • 连接 redis ,DB 默认为 0 号
var rdb = redis.NewClient(&redis.Options{
   Addr:     "localhost:6379",
   Password: "123456",
   DB:       0,
})
func LockMq(svrName string) {
   key := fmt.Sprintf("%s_lockmq", svrName)
   // 尝试加锁
   var set bool
   for {
      set = redisLock(key, svrName, time.Second*30)
      if set {
         log.Println("redisLock success ")
         if err := afterLockSuccess(key); err != nil {
            // 如果此处有err ,自然是 mq 初始化失败
            log.Println("mq init error: ", err)
         }else{
            log.Println("redisLock expire failed ")
         }
         time.Sleep(time.Second * 10)
         continue
      }
      // afterLockFailed()
      log.Println("redisLock failed ")
      time.Sleep(time.Second * 10)
   }
}
  • 基本的加锁实现
  • 设置 key , value , 过期时间为 30 秒
func redisLock(key, value string, duration time.Duration) bool {
   set, err := rdb.SetNX(context.TODO(), key, value, duration).Result()
   if err != nil {
      log.Println("setnx failed, error: ", err)
      return false
   }
   return set
}
  • 加锁成功之后,初始化 mq 客户端并进行消费,续期 redis 分布式锁
func afterLockSuccess(key string) error {
   // 初始化需要做的内容或者句柄
   // xxx
   // 对于此处的初始化 mq 句柄失败才返回 err
   ch := make(chan struct{}, 1)
   go func() {
      // 模拟消费消息
      for {
         select {
         case <-ch:
            log.Println("expire failed,mq close")
            return
         default:
            log.Println("is consuming msg")
            time.Sleep(time.Second * 2)
         }
      }
   }()
   for {
      time.Sleep(time.Second*10)
      // 续期
      set, err := rdb.PExpire(context.TODO(), key, time.Second*30).Result()
      if err != nil {
         log.Println("PExpire error!! ", err)
         return nil
      }
      if !set {
         ch <- struct{}{}
         log.Println("PExpire failed!!")
         return nil
      }
      log.Println("PExpire success!! ")
   }
}

具体的测试直接调用 LockMq 函数即可

func main(){
   go redislock.LockMq("helloworld")
   select{}
}

模拟启动多个 pod 去抢锁,抢到锁的执行业务,继续续期,抢不到锁的休息一会再接着抢

程序 a 先启动,程序 b 后启动

程序 a 日志如下:

程序 a 起来之后,启动一段时间之后,kill 掉 程序 a

image.png

程序 b 日志如下:

程序 b 先是获取锁失败,过 30s 左右,程序 b 能正常获取到锁

image.png

关于源码可以查看地址:github.com/qingconglai…

感谢阅读,欢迎交流,点个赞,关注一波 再走吧

欢迎点赞,关注,收藏

朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力

image.png

好了,本次就到这里

技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。

我是阿兵云原生,欢迎点赞关注收藏,下次见~


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
监控 NoSQL Redis
Redis Sentinel:秒杀系统背后的可靠性保障神器!
本文详细介绍了如何在个人项目中利用 Redis 哨兵模式保障系统的可靠性与高可用性。哨兵模式通过监控主从服务器状态、自动故障转移和通知客户端等功能,确保在主服务器宕机时系统仍能正常运行。适用于读请求多于写请求的场景,如秒杀系统,能有效缓解数据库压力。同时也探讨了哨兵模式在高并发场景下的优化方法及潜在缺陷,帮助开发者更好地应用该模式。
56 7
Redis Sentinel:秒杀系统背后的可靠性保障神器!
|
14天前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
21 2
|
18天前
|
NoSQL 关系型数据库 MySQL
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
本文全面阐述了Redis事务的特性、原理、具体命令操作,指出Redis事务具有原子性但不保证一致性、持久性和隔离性,并解释了Redis事务的适用场景和WATCH命令的乐观锁机制。
110 0
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
|
5月前
|
NoSQL Shell Redis
Redis热升级秘诀:保证高可用性的技术方案
Redis热升级方案允许在不中断业务的情况下,实现数千级别Redis的无缝更新。通过构建Redis Shell程序保存数据库状态,封装动态连接库,以及在运行时加载新版本库,保持客户端连接,该方法确保了业务连续性和高可用性,且升级仅需几毫秒,显著提升了系统效率。
627 6
|
5月前
|
存储 缓存 监控
快速掌握Redis优化要点,告别性能瓶颈!
# Redis优化指南 了解如何提升Redis性能,从读写方式(整体与部分)、KV size、Key数量、读写峰值、命中率、过期策略、平均穿透加载时间、可运维性、安全性等方面着手。选择合适的读写策略,如只整体读写或部分读写变更,优化KV size避免过大或差异过大,合理管理Key数量,应对不同读写峰值,监控命中率并持续优化,设置智能过期策略,减少平均穿透加载时间,确保高可运维性并强化安全性。一起探索Redis的性能潜力!
1422 5
|
5月前
|
消息中间件 存储 缓存
|
5月前
|
缓存 NoSQL Shell
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(持久化功能分析)
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(持久化功能分析)
191 0
|
5月前
|
存储 缓存 NoSQL
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)(一)
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)
429 0
|
5月前
|
NoSQL Shell 网络安全
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)(二)
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)
53 0
|
5月前
|
缓存 NoSQL 前端开发
【Redis技术专区】「实战案例」谈谈使用Redis缓存时高效的批量删除的几种方案
【Redis技术专区】「实战案例」谈谈使用Redis缓存时高效的批量删除的几种方案
139 0