RocketMQ 5.0 无状态实时性消费详解

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
性能测试 PTS,5000VUM额度
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: RocketMQ 5.0 无状态实时性消费详解

作者:绍舒


背景


RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。


SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。


在当前的消息产品中,消费普遍使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。


然而,在引入 Proxy 之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和 Proxy 与 Broker 内部的长轮询之间互相耦合,也就是说,一次客户端对 Proxy 的长轮询只对应一次 Proxy 对 Broker 的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的 Broker 时,如果请求到达了没有消息的 Broker,就会触发长轮询挂起逻辑。此时,即使另一台 Broker 存在消息,由于请求挂在了另一个 Broker 上,也无法拉取到消息。这导致客户端无法实时接收到消息,即 false empty response。


这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。


AWS 的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少 false empty response。


image.png


其他产品

在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。


MNS 采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的 Broker。


首先,在长轮询时间内,会对后端的 Broker 进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与 Broker 进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致 CPU 和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。


然而,上述策略虽以尽可能轮询完所有的 Broker 为目标,但并不能解决所有问题。当轮询时间较短或 Broker 数量较多时,无法轮询完所有的 Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该 Broker 上就绪,导致无法及时取回该消息。


解法


技术方案

首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。


为了解决该问题,关键在于将前端的客户端长轮询和后端的 Broker 长轮询解耦,并赋予 Proxy 感知后端消息个数的能力,使其能够优先选择有消息的 Broker,避免 false empty response。


考虑到 Pop 消费本身的无状态属性,期望设计方案的逻辑与 Pop 一致,而不在代理中引入额外的状态来处理该问题。


另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。


  1. 为了解决该问题,本质上是要将前端的客户端长轮询和后端的 Broker 长轮询解耦开来,并赋予 Proxy 感知后端消息个数的能力,能够优先选择有消息的 Broker,避免 false empty response。
  2. 由于 Pop 消费本身的无状态属性,因此期望该方案的设计逻辑和 Pop 一致,而不在 Proxy 引入额外的状态来处理这个事情。
  3. Simplicity is ALL,因此期望这个方案简单可靠。


我们使用了 NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导 Proxy 侧的消息拉取。


通过重构 NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。


pop with notify

一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。


通知任务的目的是获取 Broker 是否存在可消费的消息,对应的是 Notification 请求;而请求任务的目的是消费 Broker 上的消息,对应的是 Pop 请求。


首先,长轮询任务会执行一次 Pop 请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个 Broker 提交一个异步通知任务。


在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的 Broker 存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy 即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。


image.png


消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该 Broker 存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。


消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。


image.png


如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。


自适应切换

考虑到当请求较多时,无需采用 pop with notify 机制,可使用原先的 pop 长轮询 broker 方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前 Proxy 统计的 pop 请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用 pop with notify;反之则使用 pop 长轮询。


由于上述方案基于的均为单机视角,因此当消费请求在 proxy 侧不均衡时,可能会导致判断条件结果有所偏差。


Metric

为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组 metric 指标,来记录并观测实时长轮询的表现和损耗。


  1. 客户端发起的长轮询次数 (is_long_polling)
  2. pop with notify 次数 (通过现有 rpc metric 统计)
  3. 首次 pop 请求命中消息次数 (未触发 notify) (is_short_polling_hit)


使用方式

在使用时需明确长轮询和短轮询的区分,可以参考 AWS 的定义,当轮询时间大于 0 时,长轮询生效。


image.png


可以看到需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS 的最小值采取了 1 秒。


image.png


在目前版本的 Apache RocketMQ 服务端中,采用了最小 5 秒的限制,即需超过 5 秒才能触发长轮询,该值可在 ProxyConfig#grpcClientConsumerMinLongPollingTimeoutMillis 中配置或修改。

对于 SimpleConsumer 而言,可以通过 awaitDuration 字段来调整长轮询时间。


SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    // set await duration for long-polling.
    .setAwaitDuration(awaitDuration)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .build();


总结


通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免 false empty response,保证消费的实时性,同时,相较于原先 PushConsumer 的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销。


RocketMQ 学习社区体验地址

RocketMQ 学习社区重磅上线!AI 互动,一秒了解 RocketMQ 功能源码。RocketMQ 学习社区是国内首个基于 AIGC 提供的知识服务社区,旨在成为 RocketMQ 学习路上的“贴身小二”。


PS:RocketMQ 社区以 RocketMQ 5.0 资料为主要训练内容,持续优化迭代中,回答内容均由人工智能模型生成,其准确性和完整性无法保证,且不代表 RocketMQ 学习社区的态度或观点。
立即体验 RocketMQ 学习社区(建议 PC 端体验完整功能): https://rocketmq-learning.com/  


为了帮助用户更全面的了解 RocketMQ 5.0,同时收集更多反馈,「寻找 RocketMQ 首席评测官」活动惊喜上线!



点击此处,即可报名参加!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5天前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
36 15
|
4天前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
27 9
|
1天前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
14天前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
23 0
手撸MQ消息队列——循环数组
|
1月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
55 5
|
1月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
87 1
|
2月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ