消息服务中如何确保消息至少被消费一次

简介: 消息服务中如何确保消息至少被消费一次

对消息服务需要了解的朋友,可以移步:

  1. 聊聊mq的使用场景
  2. 聊聊业务系统中投递消息到mq的几种方式
  3. 谈谈mq消息消费的几种方式

本章讨论主题

  1. 如何确保消息至少消费一次,确保消费者最大程度消费成功

消费者消费消息有2中方式:

  1. push方式
    消息服务接收到消息之后,主动将消息推送给消费者消费
  2. pull方式
    消费者定时从消息服务中拉取消息进行消费

下面我们将讨论2中方式中如何确保消息至少被消费一次。

push模式

消费的过程:

  1. 消息服务查询待消费的消息列表
  2. 轮询待消息列表
  3. 调用消费者
  4. 消费者收到消费请求,执行业务处理,将处理结果返回给消息服务
  5. 消息服务接收到消费成功的信息,将消息状态置为消费成功状态
  6. 继续消费下一条消息

探讨一下上面需要考虑的问题:

若消息一直消费失败如何处理?

先说一下影响:

  1. 消息被阻塞
    消息如果一直消费失败,消息服务会不断调用消费者进行消费,会阻塞其他消息的消费,直接影响到业务的正常进行.

消费失败的原因:

  1. 代码问题
    这种情况不管尝试多少次,消息都会消费失败,需要人工介入修复bug,这个可以依靠监控系统发现bug,同时开发进行修复。
  2. 系统运行异常如调用超时、网络问题等一些不可控的因素。产生这种错误,继续重试,最终会处理成功。

此处咱们只用讨论消息服务中重试机制如何设计?

系统异常情况下,可能过一段时间,系统恢复了,此时去重试,消费也就成功了。

所以我们对于消费失败的消息采用延迟处理的方式,可以这么实现:

消息中增加几个字段用于重试:next_dispose_time【下次处理时间】、max_failure【最大允许失败次数】、failure【当前失败次数】,消息入库时:next_dispose_time=需消费的时间,max_failure = 运行最大失败次数, failure=0;

当消费失败时,处理过程:
计算下次处理时间(next_dispose_time),可以在当前时间上面做指数递增,比如根据失败次数依次在当前时间上递增2的failure次方秒,如:
第1次失败:当前时间 + 2秒
第2次失败:当前时间 + 4秒
第3次失败:当前时间 + 8秒
第4次失败:当前时间 + 16秒
.......
第n次失败:当前时间 + 2的n次方秒

failure++

消息服务查询待消费的消息也需要做调整:
select * from 消息表 where next_dispose_time<=当前时间 and failure此时能够最大程度保证消息最少消费成功一次。

pull方式

这种会复杂一些,为何会复杂一些,咱们先看一下常规的流程:

  1. 消费者从消息服务中拉取消息
  2. 本地进行处理
  3. 从消息服务中删除此消息
  4. 继续拉取下一条进行处理

如果本地一直处理失败,那么后面拉取到的都是同一条消息,这条消息直接阻塞后续消息的消费,这种情况如何解?

咱们先分析一下出现这种问题的后果及原因:

  1. 后果:消息被阻塞,业务无法正常运行
  2. 原因:代码问题或其他异常
  3. 确保代码没问题,可以解决上面问题,及时性不够高,线上要考虑系统的容错能力。

遇到这种问题还是挺严重了,业务方都是无法接受的,一条消息消费失败,会影响到其他所有消息的消费,这个我们还是得想办法解决,可以这样:

  1. 消费者拉取消息
  2. 落地到本地
  3. 从消息服务中删除此消息
  4. 异步去消费本地落地的消息

消息先落地,然后异步处理,本地需要有个补偿的job,去处理本地消费失败的消息,这个可以参考push方式消费的过程。

对消息服务感兴趣的,可以加群讨论
_

相关实践学习
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
本场景将自定义告警信息同时分发至多个通知渠道的需求,例如短信、电子邮件及钉钉群组等。通过采用轻量消息队列(原 MNS)的主题模型的HTTP订阅方式,并结合应用实时监控服务提供的自定义集成能力,使得您能够以简便的配置方式实现上述多渠道同步通知的功能。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
存储 算法 Sentinel
实现熔断、限流的底层原理是什么
实现熔断、限流的底层原理是什么
|
JavaScript 安全 开发者
Vue3 中对 TypeScript 的支持
Vue3 中对 TypeScript 的支持
154 2
|
10月前
|
SQL 缓存 关系型数据库
美团面试:Mysql 有几级缓存? 每一级缓存,具体是什么?
在40岁老架构师尼恩的读者交流群中,近期有小伙伴因未能系统梳理MySQL缓存机制而在美团面试中失利。为此,尼恩对MySQL的缓存机制进行了系统化梳理,包括一级缓存(InnoDB缓存)和二级缓存(查询缓存)。同时,他还将这些知识点整理进《尼恩Java面试宝典PDF》V175版本,帮助大家提升技术水平,顺利通过面试。更多技术资料请关注公号【技术自由圈】。
美团面试:Mysql 有几级缓存? 每一级缓存,具体是什么?
|
11月前
|
算法 搜索推荐
数据结构与算法学习十一:冒泡排序、选择排序、插入排序
本文介绍了冒泡排序、选择排序和插入排序三种基础排序算法的原理、实现代码和测试结果。
312 0
数据结构与算法学习十一:冒泡排序、选择排序、插入排序
|
11月前
|
设计模式 Java Maven
Springboot Starter 是如何工作的?
本文详细介绍Springboot Starter的实现原理、设计思想及其优缺点。Springboot Starter通过Maven或Gradle依赖管理引入相关依赖,并利用自动配置和条件注解简化开发流程。文章通过示例展示了如何创建自定义Starter,并分析了其模块化设计、约定优于配置、自动配置、依赖注入、开闭原则及单一职责原则等设计理念。尽管Starter带来诸多便利,但也存在黑盒操作、过度依赖及启动时间增加等问题。通过本文,你将全面了解Springboot Starter的工作机制与应用场景。
319 3
|
消息中间件 Kafka 数据库
【后端面经】【消息队列】22 | 消息队列:消息队列可以用来解决什么问题?-02 超时场景+性能问题
【5月更文挑战第7天】 本文介绍了电商中订单超时取消的处理方法,通过使用消息队列实现延时消息。当订单30分钟后未支付,消息队列将触发取消操作,但需注意并发问题,如采用分布式锁或乐观锁避免并发更新订单状态。乐观锁确保只有订单状态为未支付时才允许支付。主流消息队列如RocketMQ支持延迟消息,而Kafka不支持。 使用消息队列的好处在于解耦和提高系统性能、扩展性和可用性。同步调用会导致性能下降,因为必须等待所有调用完成。并发调用虽可提升性能,但仍逊于消息队列,且无法解决扩展性和可用性问题。
266 1
Java8中的函数式接口详解(Supplier、Consumer、Predicate、Function)
Java8中的函数式接口详解(Supplier、Consumer、Predicate、Function)
1333 1
|
存储 缓存 算法
双向链表的建立和使用场景
双向链表的建立和使用场景
|
搜索推荐 算法
【数据结构】排序之插入排序和选择排序
【数据结构】排序之插入排序和选择排序
470 0