FreeMQTT Plus: 一个新型 MQTT Broker 集群的实现

简介: FreeMQTT Plus 是一款基于 MQTT 协议的高性能消息中间件,采用分布式架构解决单点瓶颈问题。其核心由 Nginx 负载均衡器、黑(A)节点(MQTT Broker)、白(B)节点(消息路由)和日志(L)节点组成。通过无主从设计,支持高可用性、负载均衡与灵活扩展。针对会话同步、消息路由等挑战,FreeMQTT Plus 利用 MQTT5 特性定义元命令,实现节点间高效通信,无需依赖第三方组件。适用于物联网海量设备接入与高并发场景,为未来边缘计算和多级集群部署提供坚实基础。

FreeMQTT Plus: 前言

随着物联网(IoT)的快速发展,MQTT 协议因其轻量级、低带宽消耗和高效的发布-订阅模型,已成为物联网通信的事实标准。在这一背景下,MQTT Broker 作为消息传输的中枢,其性能和可靠性直接决定了整个系统的可用性。面对海量设备接入和高并发消息场景,单节点 Broker 的局限性逐渐暴露,而集群化部署成为解决这一问题的必然选择。本文将从集群设计目标、实现方案及挑战等多个角度,探讨 FreeMQTT Plus 集群的实现路径。


一、MQTT Broker 集群的设计目标

MQTT Broker 集群的核心目标是解决单点瓶颈问题,同时满足以下需求:

  1. 高可用性:通过多节点冗余,避免单点故障导致服务中断。
  2. 负载均衡:动态分配客户端连接和消息处理压力,提升系统吞吐量。
  3. 横向扩展:支持按需扩容,灵活应对设备数量增长和流量波动。
  4. 数据一致性:确保集群内会话(Session)状态和消息路由的一致性。

二、常见的集群实现方案

目前主流的 MQTT Broker 集群实现方案可分为以下几类:

1. 基于代理层的分布式架构

  • 实现原理:在 Broker 节点前部署负载均衡器(如 Nginx、HAProxy),通过 TCP/IP 层分发客户端连接。各 Broker 节点独立运行,不共享状态。
  • 优点:架构简单,易于部署;适合客户端会话无状态的场景(Clean Session=1)。
  • 缺点:无法处理持久化会话(Clean Session=0),消息可能因节点故障丢失;跨节点主题路由需额外处理。
  • 典型应用:EMQX 早期版本、HiveMQ 的负载均衡模式。

2. 共享订阅与数据同步

  • 实现原理:集群节点间通过共享数据库(如 Redis、RocksDB)或分布式一致性协议(如 Raft、Gossip)同步会话和消息数据。
  • 优点:支持持久化会话;节点故障时客户端可无缝迁移。
  • 缺点:数据同步可能成为性能瓶颈;强一致性要求可能牺牲吞吐量。
  • 典型应用:Mosquitto 结合 Redis 插件、EMQX 的 Mnesia 数据库同步。

3. 分片与路由优化

  • 实现原理:按客户端 ID 或主题(Topic)对集群节点进行分片,结合路由表实现消息精准投递。
  • 优点:减少跨节点通信开销;适合大规模主题场景。
  • 缺点:分片策略需预先设计,动态扩容复杂;需处理热点分片问题。
  • 典型应用:VerneMQ 的主题分区策略、NanoMQ 的横向分片方案。

三、FreeMQTT Plus 架构

FreeMQTT Plus 采用的是代理分布式架构。

组成:

  • Nginx 负载均衡器。
  • 黑节点(A Node) 即是我们已知的 MQTT Broker, 具有和其他节点组成集群的能力。
  • 白节点(B Node) 是连接各个黑节点,起着消息路由功能,对客户端是不可见的。
  • 日志节点(L Node)是汇集黑白节点日志的,此节点可选且只能有一个

架构图:
freemqtt-plus-struc.png


四、MQTT Broker 集群实现中的关键挑战

一个 MQTT Broker 集群需要解决如下的难题:

1. 会话状态的同步难题

MQTT 协议要求 Broker 维护客户端会话状态(如未确认的 QoS 1/2 消息)。在集群中,如何高效同步会话数据成为关键。一种折中方案是采用“粘性会话”(Sticky Session),通过负载均衡器将同一客户端固定到特定节点,但牺牲了故障转移的即时性。

2. 消息路由的复杂性

当发布者与订阅者位于不同节点时,消息需跨节点转发。常见的优化手段包括:

  • 主题树路由:将主题按层级拆分,节点仅订阅特定子树。
  • 元数据广播:通过 Gossip 协议快速传播订阅关系。
  • 边缘路由:在边缘节点缓存订阅信息,减少中心节点压力。

3. 横向扩展与资源争用

集群扩容时,新节点的加入可能引发资源竞争。例如,Kubernetes 等容器化平台可通过 StatefulSet 动态扩展 Broker Pod,但需配合服务发现机制(如 etcd)动态更新集群成员列表。


五、FreeMQTT Plus 的对策

对于上一节中提到的 MQTT Broker 集群实现的挑战,FreeMQTT Plus 采取如下对策:

1. 会话状态的同步

  • FreeMQTT Plus 采用分布式黑(A)白(B)节点架构
  • 在集群中,不需采用“粘性会话”(Sticky Session)
  • 客户的 session 保存连接的A节点上
  • 客户连接时, 当 clean senssion=0,该A节点随机选择(节点内部的负载均衡)一个B节点查询其他A节点是否有该clientID 的session存在,若存在则同步 session 信息 。

2. 消息路由

  • Retain 消息
    • MQTT 客户端订阅新的 Topic 时,查询本地A节点是否有保留消息有则发送给客户端
    • 客户所在的A节点随机选择(节点内部的负载均衡)一个B节点查询其他A节点是否有是否有对应该Topic的保留消息, 如有则返回给客户所在的A节点
  • 新发布的消息
    • MQTT 客户端发布某个 Topic 消息时,其所在的A节点随机选择(节点内部的负载均衡)一个B节点,把新的消息发给该B节点
    • B节点根据客户所属的Application ID, 来路由消息 (B节点根据客户连接时所属的Application记录A节点和Application关联)
    • 收到B节点路由来的消息后,A节点按正常的规则转发给相关订阅该 Topic 的客户端
  • 共享订阅
    • 当上面的B节点收到一个新的消息路由请求时,除了做常规订阅的路由时,该B节点还向集群中的每个A节点查询该消息Topic是有对应的共享订阅
    • A节点返回共享订阅信息的列表
    • B节点根据全部A节点返回的共享订阅列表,随机选个A节点分发给某个A节点(到达共享分发的目标)

3. 横向扩展

  • FreeMQTT Plus 的黑(A)白(B)节点是无主从关系的,每个的A节点是对等的不分主次
  • 同样每个B类节点也是不分主次
  • 客户的 session 信息分布在不同的A节点上,不进行中心存储同步
  • 客户的订阅信息也是分布存储在不同的A节点上
  • A、B节点都不存在单点故障问题
  • A、B节点都易于水平扩展

4. 黑白节点间通讯机制

  • 黑(A)白(B)节点之间通讯底层协议采用MQTT5
  • 借助 MQTT5 的 User Propertity 和 Correlation Data 属性定义节点间的通讯元语
  • 没有像其他MQTT Broker集群那样借助 gossip 或 redis 类的第三方协议或组件
  • 每个B节点都是一个特殊的MQTT客户端(属于特殊的Application)
  • 每个B节点作为FreeMQTT Plus集群的智能消息路由角色
  • 为使A、B节点进行通讯,FreeMQTT Plus 定义如下17个通讯元 (meta command):
    • 增加 App 引用计数 INC_APPREF,方向 A-->B
    • 减少 App 引用计数 DEC_APPREF,方向 A-->B
    • 获取 App 引用列表 FETCH_APPREF,方向 B-->A
    • 反馈 App 引用列表 FETCH_APPREF_ECHO,方向 A-->B
    • Pub 消息路由 ROUTE_MESSAGE,方向 A<-->B
    • 获取Retain消息 FETCH_RETAIN_MESSAGE,方向 B-->A
    • 返回Retain消息 RETAIN_MESSAGE_ECHO,方向 A-->B
    • 获取共享消息 FETCH_SHARE_MESSAGE,方向 B-->A
    • 返回共享消息 FETCH_SHARE_MESSAGE_ECHO,方向 A-->B
    • 获取共享消息 FETCH_SHARE_MESSAGE,方向 B-->A
    • 派发共享消息 DELIVERY_SHARE_MESSAGE,方向 B-->A
    • 获取A节点metrics FETCH_METRICS,方向 B-->A
    • 返回A节点metrics FETCH_METRICS,方向 A-->B
    • 清楚客户session CLEAR_SESSION,方向 A<-->B
    • 检查客户session 是否存在 CHECK_SESSION,方向 B<-->A
    • 返回客户session 是否存在 CHECK_SESSION_ECHO,方向 A-->B
    • 检查客户session 完成 CHECK_SESSION_COMPLETE,方向 B<-->A

六、结语

MQTT Broker 集群的实现是一项系统性工程,需在一致性、可用性和扩展性之间寻求平衡。未来,随着 5G 和边缘计算的普及,多级集群(中心-边缘协同)、混合云部署等新模式将进一步推动 MQTT 集群架构的演进。唯有深入理解协议特性与业务场景,方能设计出既稳健又灵活的集群方案。


希望这篇文章能够为 MQTT 集群的实践提供有价值的参考!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 监控 RocketMQ
Docker部署RocketMQ5.2.0集群
本文详细介绍了如何使用Docker和Docker Compose部署RocketMQ 5.2.0集群。通过创建配置文件、启动集群和验证容器状态,您可以快速搭建起一个RocketMQ集群环境。希望本文能够帮助您更好地理解和应用RocketMQ,提高消息中间件的部署和管理效率。
363 91
|
4月前
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
750 2
|
9月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
684 0
|
5月前
|
存储 算法 安全
FreeMQTT:一款Python语言实现的开源MQTT Server
FreeMQTT 是一款用 Python 语言并基于 Tornado 开发的开源 MQTT 服务器,支持 MQTT3.1.1 和 MQTT5.0 协议,提供多租户安全隔离、高效 Topic 匹配算法及实时上下线通知等功能,适用于 IoT 场景。快速启动仅需克隆仓库、安装依赖并运行服务。
|
8月前
|
消息中间件 存储 负载均衡
|
8月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
102 2
|
9月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
191 0
|
8月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
461 0
下一篇
oss创建bucket