队列数量变更会导致顺序消费失效,我是这样解决的...

简介: 队列数量变更会导致顺序消费失效,我是这样解决的...

在金融行业中,如果用户订阅了余额变更短信通知服务,当余额发生变化会收到短信通知,但收到短信的顺序必须和用户银行卡账号余额发生变化的顺序一致。


这个场景是典型的顺序消费场景,在分布式架构体系中,账户余额服务与发送短信是两个不同的微服务,通常会基于MQ来实现解耦合,其时序图如下图所示:

2117a4d5bcbcf7cb179a4b2404e62897.png

引入了MQ,那如何保证顺序呢?


1、理论基础


RocketMQ提供了基于分区(队列级别)顺序消费,能保证一个队列中的消息顺序投递,基于RocketMQ顺序消费机制可以实现上面的场景,其具体实现如下:


36c336c4de7241a0a0fae271c25f3581.png

正如上图所示,在消息发送阶段将同一个账号的消息发送到同一个分区,具体做法是按照账户key进行hash并取模。


RocketMQ在消费端实现顺序消费的原理如下图所示:

8ba1581fa3241407b22404306e2f1a00.png

RocketMQ顺序消费端实现主要依赖三把琐:


经过队列负载算法消费者分配到队列后开始进行消息拉取之前需要向Broker端申请该队列的琐


  • 但消费者拉取到一个队列的消息后,对同一个队列中的消息消费时,会对消息队列加锁,确保队列中的消息顺序执行。
  • 在消费过程中会对处理队列加锁,主要是确保在消费时不会因为重平衡导致数据重复消费。

从上面的琐机制来说,顺序消费在严格实现顺序语义的前提下也会尽量减少消息重复消费。

2、代码级别实现


在消息发送端可以采取自定义负载算法来实现队列的负载均衡机制,其代码实现如下图所示:

0f7d346812979d0ba071862906fb5f48.png

温馨提示:使用自定义的消息发送负载算法,RocketMQ消息发送内部的重试机制将失效,请再调用该方法的上层进行重试。

消费端的代码就更加简单,只需要在创建消费者时选择顺序消费监听器即可,代码如下图所示:

2508c07ea91c5cac88c3df235b7fa714.png

温馨提示:顺序消费端重试次数并不是16,而是Integer.MAX_VALUE,故请特别注意,业务类异常一定在消费端监听器中必须处理,如果是由于不满足业务规则,则重试无限次意义不大。


3、进阶


理想是美好的,现实是骨感的。


3.1 分区扩容、缩容对顺序消费端影响


在RocketMQ中实现顺序消费端重中之重是将同一个账号的数据发送到同一个队列,但是由于队列的扩容、缩容,由于消息发送过程中由于队列变更,上述队列负载算法,会导致同一个账号的消息可能会同时分布在多个队列中,从而导致从顺序执行变为并发执行,导致顺序错乱,这个在金融行业中是绝不允许的。、


对于扩容、缩容这样的人为操作,我们完成等待队列中的消息全部消费完成,可以通过停机维护来规避,但如果由于Broker自身在运行过程异常导致队列减少,此种情况又如何处理呢?


实现一个自定义的队列负载算法,需要传入一个队列的总队列个数,在负载均衡过程中如果发现数量不对时将消息先暂存到数据库,并将这些失败的队列信息存储到redis中,在发送新消息时,如果计算的负载队列是失败的队列,并且当前的队列信息已经恢复到当前初始值,则先判断数据库中是否有待发送到消息,如果有,则继续将消息发送到数据库,并开启一个线程,将数据库中的消息发送到mq中,这样后续的消息就会继续进入到MQ


温馨提示:对这一块如果有疑问的话,可以私信我,一起交流学习。


3.2 性能问题


在RocketMQ中的顺序消费线程模型中,一个分区中的所有消息必须顺序执行,其性能是较为低下,其琐粒度太粗,因为在实际场景中,通常只需要同一个账号顺序执行,不同账户的消息,即使在一个分区中,也可以并行执行,大概的解决思路:


0a8cd08b55fda1c2c7126f7e960c7e09.png

对于一个消息消费队列中的消息,我们对应一个线程组,按key进行选择线程,线程内部中的消息,顺序执行。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
4月前
|
域名解析 运维 Serverless
函数计算产品使用问题之设置最大实例数为1和最大并发数为20,当请求数量超过20时,系统会如何处理
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
5月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
消息中间件 NoSQL Redis
消息重复消费的问题
消息重复消费的问题
|
6月前
|
消息中间件 负载均衡 Kafka
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
703 2
|
消息中间件 缓存 监控
Rocketmq并发和顺序消费的失败重试机制
Rocketmq并发和顺序消费的失败重试机制
|
消息中间件 算法 关系型数据库
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)
|
消息中间件 Kafka API
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)
|
消息中间件 监控 NoSQL
Rabbmit 重复消费的问题
最近遇到一个奇怪的问题,消费者在批量消费消息时,遇到该批次中出现部分重复消费导致业务异常。这些异常集中在某一时刻附近。
212 0
|
消息中间件 存储 NoSQL
该如何保证消息不被重复消费
该如何保证消息不被重复消费
198 0