用了这么久的RabbitMQ异步编程竟然都是错的!(上)

简介: 用了这么久的RabbitMQ异步编程竟然都是错的!

优秀的项目都由同步、异步和定时任务三种处理模式相辅相成。其中当属异步编程充满坑点。

1 适用场景

1.1 服务于主流程的分支流程

在注册流程中,数据写DB是主流程,但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不强,可异步处理。

1.2 用户无需实时看到结果的流程

比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。

1.3 MQ

任务的缓冲的分发,流量削峰、服务解耦和消息广播。

当然了异步处理不仅仅是通过 MQ 来实现,还有其他方式

  • 比如开新线程执行,返回 Future
  • 还有各种异步框架,比如 Vertx,它是通过 callback 的方式实现

2 异步处理之坑

异步处理流程的可靠性问题、消息发送模式的区分问题、大量死信消息堵塞队列的问题,为方便操作,本文MQ选型RabbitMQ。

2.1 异步处理需要消息补偿闭环

RabbitMQ虽可将消息落地磁盘,即使MQ异常消息数据也不会丢失,但异步流程在消息发送、传输、处理等环节,都可能发生消息丢失。MQ都无法确保百分百可用,业务设计都需考虑不可用时异步流程将如何继续。

因此,对于异步处理流程,必须考虑补偿或建立主备双活流程

2.1.1 案例

用户注册后异步发送欢迎消息。

  • 用户注册落DB为同步流程
  • 会员服务收到消息后发送欢迎消息为异步流程
  • image.png
  • 蓝线
    MQ异步处理(主线),消息可能丢失(虚线代表异步调用)
  • 绿线
    补偿Job定期消息补偿(备线),以补偿主线丢失的消息
  • 考虑到极端的MQ中间件失效的情况
    要求备线的处理吞吐能力达到主线性能

代码示例

  • UserController 注册+发送异步消息。注册方法,一次性注册10个用户,用户注册消息不能发送出去的概率为50%。
  • image.png
  • MemberService 会员服务监听用户注册成功的消息,并发送欢迎短信。使用ConcurrentHashMap存放那些发过短信的用户ID实现幂等,避免相同的用户补偿时重复发短信
  • image.png
  • 对于MQ消费程序,处理逻辑务必考虑去重(支持幂等)因为:
  • MQ消息可能会因为中间件本身配置错误、稳定性等原因出现重复
  • 自动补偿重复,比如本例,同一条消息可能既走MQ也走补偿,肯定会出现重复,而且考虑到高内聚,补偿Job本身不会做去重处理
  • 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时间后处理程序又收到消息了,重复处理。我之前就遇到过一次由MQ故障引发的事故,MQ中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了就先通过后台进行了人工处理,结果MQ系统恢复后消息又被重复处理了一次,造成大量资金重复发放。

小结

异步处理的时候需要考虑消息重复的可能性,处理逻辑需要实现幂等,防止重复处理。

接着定义补偿Job即备线操作。

在CompensationJob中定义一个@Scheduled定时任务,5秒做一次补偿操作,因为Job并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑:每5秒补偿一次,按顺序一次补偿5个用户,下一次补偿操作从上一次补偿的最后一个用户ID开始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。

image.png

为实现高内聚,主线和备线处理消息,最好使用同一方法。本案例的MemberService监听到MQ消息和CompensationJob补偿,调用的都是welcome。z这里的补偿逻辑简单,实际生产代码应该做到:


考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足补偿的吞吐量

考虑备线补偿数据进行适当延迟

比如,对注册时间在30秒之前的用户再进行补偿,以方便和主线MQ实时流程错开,避免冲突。

诸如当前补偿到哪个用户的offset数据,需要落地数据库。

补偿Job本身需要高可用,可以使用类似XXLJob或ElasticJob等任务系统。

运行程序,执行注册方法注册10个用户,输出如下:

[17:01:16.570] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 1
[17:01:16.571] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 5
[17:01:16.572] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 7
[17:01:16.573] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 8
[17:01:16.594] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 1
[17:01:18.597] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 1
[17:01:18.601] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 5
[17:01:20.603] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 5
[17:01:20.604] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 7
[17:01:22.605] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 7
[17:01:22.606] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 8
[17:01:24.611] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 8
[17:01:25.498] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 0 补偿
[17:01:27.510] [compensation-threadpool-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 2
[17:01:27.510] [compensation-threadpool-3] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 4
[17:01:27.511] [compensation-threadpool-2] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 3
[17:01:30.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 5 补偿
[17:01:32.500] [compensation-threadpool-6] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 6
[17:01:32.500] [compensation-threadpool-9] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 9
[17:01:35.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 9 补偿
[17:01:37.501] [compensation-threadpool-0] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 10
[17:01:40.495] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 10 补偿

可见

  • 共10个用户,MQ发送成功的用户有四个:1、5、7、8
  • 补偿任务第一次运行,补偿了用户2、3、4,第二次运行补偿了用户6、9,第三次运行补充了用户10
  • 针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。即若补偿备线足够完善,即使直接把MQ停机,虽然会略微影响处理的及时性,但至少确保流程都能正常执行。

小结

实际开发要考虑异步流程丢消息或处理中断场景。

异步流程需有备线以补偿,比如这里的全量补偿方式,即便异步流程彻底失效,通过补偿也能让业务继续进行。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 存储 监控
RabbitMQ 面试题及答案整理,最新面试题
RabbitMQ 面试题及答案整理,最新面试题
233 1
|
消息中间件 Java Kafka
计算机应届生一定要会的JAVA面试题:RabbitMQ是如何实现消息路由的?
一个应届生去面试,可能没有什么实战经验,今天被问到一个这样的面试题,说“RabbitMQ是如何实现消息路由的?“一下子竟然不知道如何组织语言了。今天我给大家分享一下我的理解。
109 1
|
消息中间件 Java Maven
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
68 0
|
8月前
|
消息中间件 缓存 NoSQL
非常强悍的 RabbitMQ 总结,写得真好
非常强悍的 RabbitMQ 总结,写得真好
33 0
|
8月前
|
消息中间件 数据库
面试题解析:RabbitMQ在多线程秒杀系统中的关键作用
面试题解析:RabbitMQ在多线程秒杀系统中的关键作用
88 0
|
消息中间件 存储 网络协议
RabbitMQ 26问,基本涵盖了面试官必问的面试题
RabbitMQ 26问,基本涵盖了面试官必问的面试题
1277 1
|
消息中间件 算法 Java
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ灵活运用,怎么理解五种消息模型
167 0
|
消息中间件 存储 网络协议
我用ChatGPT,给RabbitMQ加了个连接池
上次我把 RabbitMQ 集成到项目中,但是每次使用 RabbitMQ 时都去 New 一个连接,导致并发起不来,所以这次我们就给 RabbitMQ 加一个连接池。 为了偷个懒,我直接用 ChatGPT 教我加。
|
消息中间件 存储 数据可视化
面试必问:RabbitMQ 有哪几种消息模式?
面试必问:RabbitMQ 有哪几种消息模式?