基于 RocketMQ 的 MQTT 服务架构在小米的实践

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
可观测可视化 Grafana 版,10个用户账号 1个月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 本文整理自RocketMQ Summit 2022 全球开发者峰会。

1.jpeg

本文作者:房成进 - 小米高级研发工程师


小米 MQTT应用场景


2.png


小米之家门店的支付通知是小米MQTT落地的重要场景之一,流程如上图所示。店员通过终端发送下单请求到后端服务,后端在接收到下单请求后,调用支付服务,等待用户付款。门店终端如何知道本次付款是否成功呢?


我们采用MQTT协议来实现支付消息的通知。支付服务将本次订单的支付结果发布到MQTT 服务的一个 Topic中,门店终端与服务保持长连接,订阅 Topic来实时获取支付结果,从而进行下一步操作如打印发票等。得益于 TCP长连接和MQTT协议的轻量化,门店终端系统的支付响应能力从 200 毫秒下降至 10 毫秒内,MQTT服务发布端到订阅端的平均延时为2.6ms。


2.png


手机智能制造工厂是小米MQTT落地的另一个核心场景。MQTT主要应用于设备状态数据采集以及设备控制指令下发。上图右侧为小米智能制造工厂架构图。


上行链路流程为:手机生产线上的众多工业设备会将操作日志、设备参数、环境参数等通过工业控制层发布到MQTT服务,MQTT服务的存储层通过数据集成任务将数据打入大数据系统,进行数据的分析、建模和处理等,最后实现最上层应用工业 BI 和数字孪生的需求。


下行链路流程为:工厂的工作人员通过云端服务将控制指令下发到MQTT集群,生产线上的设备与MQTT服务集群保持长连接,以接受来自云端的控制指令并执行相应动作。这两个链路对时效性要求很高。目前, MQTT 服务能保证上行和下行链路延时在 20ms以内,服务可用性为99.95%。


小米 MQTT服务架构演进过程


3.png


早期,小米主要基于RocketMQ 社区在 18 年开源的RocketMQ-IoT-Bridge来构建自己的 MQTT 服务。RocketMQ-IoT-Bridge为单机架构,一是不支持水平扩展,总连接数存在瓶颈,自然无法保证高可用。二是数据无法持久化,只提供内存存储,一旦重启服务,必然导致消息丢失。三是只支持MQTT 协议QoS0,消息存在丢失风险,无法满足小米的业务要求。如图所示,服务整体为单机服务架构,发布端和订阅端连接到同一个进程。


4.png


小米基于单机的架构进行了一系列的拓展。高可用方面,从单机变为分布式的可扩展架构,连接数从单机的 5 万变为可横向扩展的模式;可靠性方面,在QoS0 的基础上实现了MQTT协议规定的 QoS 1 和 QoS 2;消费模式方面,除了默认的广播消费,支持了MQTT5.0新增的共享消费模式,同时还支持了离线消息。


5.png


上图右侧是小米基于 RocketMQ 的分布式 MQTT 架构图。最上层为客户端,发布者和订阅者连接到负载均衡器,这里使用四层的负载均衡LVS, 主要目的是将请求均摊到各个MQTT Bridge。MQTT Bridge 即MQTT的服务节点,负责连接、订阅、解析协议和消息转发。RocketMQ 作为存储层,负责持久化消息。类似于存算分离设计,MQTT Bridge 和 RocketMQ 均可独立水平扩展。


得益于 RocketMQ 从 2020 年开始在小米大规模落地,我们采用RocketMQ来持久化 MQTT 消息。整个发布订阅的过程演变成消息从 Bridge发送到RocketMQ,再从RocketMQ消费数据然后推送到订阅端。每一个MQTT Bridge 内嵌 RocketMQ SDK ,充当 RocketMQ的客户端,既作为生产者也作为消费者。


此外,持久化层支持了小米自研的消息队列Talos,提供了可插拔模式。根据业务数据的下游使用场景,部署时可灵活选择任意一个消息队列作为持久化层。


6.png


MQTT协议的消息结构和 RocketMQ 的消息结构互相独立,因此如果将MQTT协议的消息持久化到 RocketMQ 中,必然需要做一定的匹配。MQTT Topic有多级,如图中T1/T2/T3所示,为多级树形结构。将 T1 看作一级 Topic,对应 RocketMQ 中的 Topic T1,则所有发往以 T1 开头的 MQTT Topic的消息都会持久化到 RocketMQ 的 T1 Topic中。


此时问题演变成如何区分一条消息属于哪个MQTT Topic,我们选择将MQTT Topic设为消息的 tag,MQTT消息中的一些可变 header 直接放在RocketMQ 消息属性 KV 中,消息体可以直接映射到 RocketMQ消息的 Payload 中,这样完成了MQTT消息到RocketMQ消息的映射。


7.png


除消息数据之外,元数据是 MQTT 服务非常重要的一部分。MQTT Bridge 中保存了两类元数据,分别是客户端元数据和订阅元数据。客户端元数据保存了客户端的连接信息、连接时间、客户端 ID、Netty channel 等信息,我们实现了可视化的控制台,支持查询MQTT服务的连接数,支持通过连接 ID 和客户端 ID 查询客户端的信息。此外,实现了客户端上下线通知,用户可以通过订阅 MQTT  Topic实时获取到某个客户端的上线和下线事件。订阅元数据保存了客户端和MQTT的映射关系,主要通过Trie树来保存订阅关系,可以满足通配符的方式订阅,实现快速匹配。Bridge 通过订阅 Topic找到客户端,将消息定向推送。


8.png


MQTT协议主要有三个服务质量等级 QoS 0、 QoS 1 和 QoS 2。QoS 0表示消息最多发一次,可能存在丢失消息的情况,性能最好,对于数据可靠性要求不高的业务较为实用。QoS 1 为消息保证能至少到达一次,可能会重复,性能相对差一些。QoS 2 为消息不丢不重,但性能最差。


9.png


上图为QoS0的实现流程。QoS 指发送端和接收端之间的消息传输质量。发布消息时,MQTT Bridge 作为消息的接收端,IoT 设备作为发布端。订阅消息时,MQTT Bridge作为发布端,IoT设备作为接收端。发布和订阅是两个独立的 QoS 过程,整条链路的 QoS 是这两部分 QoS 的最低值,比如发布是 QoS 1,订阅是 QoS 0,则整条链路的 QoS 等级就是 0。左侧是 QoS 0 发布的过程。发布端IoT将消息推送给MQTT Bridge,Bridge 将消息异步推送到 RocketMQ,无需等待响应。图中两个箭头的请求都可能失败,可能会丢数据,可靠性很低。但由于链路短,因此性能较高。


10.png


上图为 QoS 1的实现流程。IoT 终端发布消息之前,会先将其持久化到本地内存里,Bridge 收到消息之后,将消息异步推送到 RocketMQ,等待消息持久化成功的结果后,再返回pubAck包给IoT,IoT 将内存里的这条消息删除。发送的请求可能会失败,发送端内存里存储了消息,因此可以通过重试来实现消息至少被发一次,但也导致消息可能会重复发送。订阅端同理。


11.png


QoS 2 的实现流程如上图。在QoS 1时, Bridge接受到消息后没有将消息持久化在自己的内存里,而是直接将消息推送到RocketMQ中。如果发送端一直没有收到 pubAck 包,则执行重发,重发之后 Bridge无法获知收到的消息是新消息还是重发消息,会造成消息重复。QoS 2基于 messageID 来实现消息去重。MQTT 协议要求 message ID 可以被重复使用,且有一定范围,不会一直递增。所以在利用 messageID 去重的同时,还要保证 messageID 在传输过程中不能有重复,用完后必须释放。


依据这两点前提,sender在发送消息之前,会将消息持久化在自己的内存里,再推送给 receiver。receiver 收到消息之后也会放在本地内存里,返回 PubRec 包给 sender,通知其已经收到消息。如果 sender 一直没有收到PubRec包,会不断地重复发送消息。由于receiver 内存里已经保存了消息,因此可以通过 messageID 来实现消息的去重。发送端在接收到 PubRec 包后发布PubRel包,通知 receiver 可以清理内存中的消息,也意味着sender已经知道消息已被 receiver 持久化,此时再由 receiver 将消息推给RocketMQ 并等待持久化响应。receiver 发送 PubComp 包给 sender通知其可将PubRel包删除。上图中步骤 3.3可能失败,因此sender必须在内存中缓存PubRel包。上述流程存在两步确认机制,第一个是保证消息能到达 receiver ;第二个是保证将用过的 messageID 释放掉,能够实现 message ID 的重用。


12.png


推拉模型是 MQTT Bridge 实现消息发布订阅的核心模型。假设以下场景:有四个订阅端,其中订阅端IoT-1和IoT-2分别订阅了 Topic1/a、Topic1/b,IoT-3和IoT-4分别订阅了Topic2/ a。第一、二台设备连接到第一个 Bridge,第三、四台设备连接到第二个 Bridge。当有新的订阅关系过来时,会检查订阅一级 Topic。上图中Bridge1 维护的两个订阅关系分别是Topic1/a、Topic1/b,它会启动 RocketMQ的消费任务,从RocketMQ中消费 Topic1 中的数据。消费到的每条消息通过tag判断属于哪个 MQTT Topic,再通过匹配树将消息推送给客户端。每一个 RocketMQ Topic对应一个拉取消息的任务,而一级 Topic下面可能有很多MQTT Topic,一旦MQTT Topic增多,推送给客户端的延时就会变高。此外,一级 Topic下可能会存在很多终端,存在大量没有被订阅的无用消息。


Topic级别的任务无法为每个客户端都维护独立的 offset 进度。只要 Bridge 接收到客户端订阅的请求就会开启消费线程,Topic没有订阅时再将线程停掉。这样存在的问题是如果长时间没有消息发布,但订阅关系一直存在,会导致线程空转,存在很大的资源浪费。


13.png


社区在今年 3 月份开源新版MQTT架构,架构中引入了 notify 组件。作用为通知所有MQTT Bridge 一级Topic中是否有新的消息产生。每一个 Bridge 中都内置 notify 组件,负责启动针对 RocketMQ一级 Topic的集群模式消费者,一旦一级 Topic中有消息产生时,notify 组件能够感知到消息的产生,同时将消息作为事件广播给所有Bridge。其他 Bridge 收到消息事件的通知后,会为连接在这台 Bridge 上的每个终端开启独立拉取任务。拉取时不是拉取一级 Topic中的所有数据,而是通过消费 4.9.3 版本新引入的 LMQ,避免拉取一级 Topic中其他没有被当前客户端订阅的消息,以此避免了读放大。另外,每个终端独立的拉取任务可以为每个终端维护独立的 offset 进度,方便实现离线消息。


因此,只有新的消息事件到来时,才会为终端开启拉取任务。Topic没有消息或没有任何订阅关系之后,拉取任务将停止。升级后的推拉模型能够支持离线消息,大幅降低了延时,合理的启停机制有效避免线程资源的浪费。

14.png


15.png


共享订阅是MQTT5.0 协议新增的订阅模式,可以理解为类似RocketMQ中的集群模式消费。上图左侧为简单的共享队列实例。IoT 发送几条消息到 Topic1/a 中,Topic1/a有三个订阅端,每一条消息只会被推送给其中一个订阅端,比如IoT-sub-1会收到message1和message4,IoT-sub-1 会收到 message2 和message5,message 会收到message 3和message 6。其实现原理为:


每个 MQTT Bridge 会启动一个针对Topic的拉取任务。RocketMQ 本身能够支持集群模式,MQTT Bridge又作为RocketMQ的客户端,因此可以复用RocketMQ的共享订阅模型。订阅端订阅时以某种特殊方式带上消费者组名称,连接到某台 Bridge 后,该Bridge上就会用消费者组和订阅的一级 Topic来启动一个RocketMQ的集群模式消费者。第二个订阅端连接了第二台 Bridge,该Bridge也会启动一个消费者。只要Bridge 上有终端连接且他们处于一组内并订阅了同一个 RocketMQ的一级 Topic,则所有符合要求的 Bridge 会组成集群模式的消费者集群。有新的消息到达 Topic1 之后,只会被其中一个 Bridge 消费,那么也只会被连接到该 Bridge 上的 IoT 订阅端消费到。如果有多个订阅端同时连到一个 Bridge 上,消息应该推给哪个客户端呢?我们在MQTT Bridge 内置多种策略,默认选择轮询策略。一条消息发到 Bridge 后,Bridge可以轮询发送给任意一个IoT订阅端,实现单 Bridge 多订阅端的共享消费。


未来工作


16.png


未来,小米MQTT的工作将从以下四个方面继续深入探索:


  • 架构:推拉模型继续升级完善;
  • 功能:离线消息、保留消息、遗嘱消息等功能的完善;
  • 社区:拥抱开源社区,跟随社区升级RocketMQ端云一体化的架构;
  • 业务:小米汽车等IoT的场景推广和应用。


加入 Apache RocketMQ 社区


十年铸剑,Apache RocketMQ 的成长离不开全球接近 500 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。

社区 5.0 版本正在进行着如火如荼的开发,另外还有接近 30 个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学加入社区,添加社区开发者微信:rocketmq666 即可进群,参与贡献,打造下一代消息、事件、流融合处理平台。


17.jpeg


微信扫码添加小火箭进群


另外还可以加入钉钉群与 RocketMQ 爱好者一起广泛讨论:


18.png


钉钉扫码加群


关注【Apache RocketMQ】公众号,获取更多技术干货!

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
运维 Cloud Native 测试技术
极氪汽车云原生架构落地实践
随着极氪数字业务的飞速发展,背后的 IT 技术也在不断更新迭代。极氪极为重视客户对服务的体验,并将系统稳定性、业务功能的迭代效率、问题的快速定位和解决视为构建核心竞争力的基石。
|
1月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
328 56
|
14天前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
299 6
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
2月前
|
存储 运维 Serverless
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
碧桂园服务通过引入 EMR Serverless StarRocks 存算分离架构,解决了海量数据处理中的资源利用率低、并发能力不足等问题,显著降低了硬件和运维成本。实时查询性能提升8倍,查询出错率减少30倍,集群数据 SLA 达99.99%。此次技术升级不仅优化了用户体验,还结合AI打造了“一看”和“—问”智能场景助力精准决策与风险预测。
265 69
|
2月前
|
弹性计算 负载均衡 网络协议
阿里云SLB深度解析:从流量分发到架构优化的技术实践
本文深入探讨了阿里云负载均衡服务(SLB)的核心技术与应用场景,从流量分配到架构创新全面解析其价值。SLB不仅是简单的流量分发工具,更是支撑高并发、保障系统稳定性的智能中枢。文章涵盖四层与七层负载均衡原理、弹性伸缩引擎、智能DNS解析等核心技术,并结合电商大促、微服务灰度发布等实战场景提供实施指南。同时,针对性能调优与安全防护,分享连接复用优化、DDoS防御及零信任架构集成的实践经验,助力企业构建面向未来的弹性架构。
282 76
|
2月前
|
存储 人工智能 开发框架
MCP 实践:基于 MCP 架构实现知识库答疑系统
文章探讨了AI Agent的发展趋势,并通过一个实际案例展示了如何基于MCP(Model Context Protocol)开发一个支持私有知识库的问答系统。
MCP 实践:基于 MCP 架构实现知识库答疑系统
|
1月前
|
缓存 算法 网络协议
IP代理技术原理深度解析:从基础架构到应用实践
IP代理是网络通信中的关键技术,通过构建中间层实现请求转发与信息过滤。其核心价值体现在身份伪装、访问控制和性能优化三个方面。文章详细解析了HTTP与SOCKS协议的工作机制,探讨了代理服务器从传统单线程到分布式集群的技术演进,并分析了在网络爬虫、跨境电商及企业安全等场景的应用。同时,面对协议识别、性能瓶颈和隐私合规等挑战,提出了多种解决方案。未来,IP代理将融合边缘计算、AI驱动优化及量子安全加密等趋势,持续发展为支撑现代互联网的重要基础设施。
117 2
|
1月前
|
人工智能 监控 前端开发
基于 Next.js 的书法字体生成工具架构设计与 SSR 优化实践
本项目是一款书法字体生成工具,采用 Next.js 14(App Router)与 Tailwind CSS 构建前端,阿里云 Serverless 部署后端。通过混合渲染策略(SSG/SSR/CSR)、Web Worker 异步计算及 CDN 字体分片加载优化性能。服务端借助阿里云函数计算处理计算密集型任务,将平均耗时从 1200ms 降至 280ms,支持 1000+ QPS。动态路由与 ARMS 监控提升工程化水平,未来计划引入 WebGPU 和 AI 字体风格迁移技术,进一步优化用户体验。
|
2月前
|
Cloud Native Serverless 流计算
云原生时代的应用架构演进:从微服务到 Serverless 的阿里云实践
云原生技术正重塑企业数字化转型路径。阿里云作为亚太领先云服务商,提供完整云原生产品矩阵:容器服务ACK优化启动速度与镜像分发效率;MSE微服务引擎保障高可用性;ASM服务网格降低资源消耗;函数计算FC突破冷启动瓶颈;SAE重新定义PaaS边界;PolarDB数据库实现存储计算分离;DataWorks简化数据湖构建;Flink实时计算助力风控系统。这些技术已在多行业落地,推动效率提升与商业模式创新,助力企业在数字化浪潮中占据先机。
200 12
|
3月前
|
并行计算 PyTorch 算法框架/工具
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
本文探讨了如何通过技术手段混合使用AMD与NVIDIA GPU集群以支持PyTorch分布式训练。面对CUDA与ROCm框架互操作性不足的问题,文章提出利用UCC和UCX等统一通信框架实现高效数据传输,并在异构Kubernetes集群中部署任务。通过解决轻度与强度异构环境下的挑战,如计算能力不平衡、内存容量差异及通信性能优化,文章展示了如何无需重构代码即可充分利用异构硬件资源。尽管存在RDMA验证不足、通信性能次优等局限性,但该方案为最大化GPU资源利用率、降低供应商锁定提供了可行路径。源代码已公开,供读者参考实践。
233 3
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践

相关产品

  • 云消息队列 MQ