我的mqtt协议和emqttd开源项目个人理解(18) - 一个客户端sub很多主题和数据,出现宕机?使用本地共享订阅解决!

简介: 我的mqtt协议和emqttd开源项目个人理解(18) - 一个客户端sub很多主题和数据,出现宕机?使用本地共享订阅解决!

image.png


image.png

EMQ中CPU是公平分配给MQTT会话,大量pub消息到一个订阅,订阅不会拿到更多cpu,最终导致消息累积,内存溢出宕机。


最好的解决办法是分组订阅,把消息打散,多个客户端订阅。


问:请问一下,我现在的业务就是只使用一个sub,主题是\hello\#,去订阅数万个终端消息。请问针对这个问题,如何来优化业务逻辑?可以通过增加sub的方式吗?即把数万终端分成几个小组,每小组往一个主题pub,然后再分别订阅?


答:不需要自己实现,使用emq 2.x的本地+共享订阅相结合即可。使用主题:$local/$share/group/topic


http://emqtt.com/docs/v2/advanced.html#local-subscription


emq2.x版本,共享订阅只支持单节点,不支持集群。



---


关联阅读:我的mqtt协议和emqttd开源项目个人理解(24) - emq v2.3.11源码成熟度如何?


EMQ 2.3中的本地/共享订阅


简介


使用本地订阅后,指消息只在订阅者的本地(所连接的)EMQ节点,不会流传到集群中的其它EMQ节点中。在共享订阅中,订阅同一个主题的客户端会轮流的收到这个主题下的消息,也就是说同一个消息不会发送到多个订阅者,从而实现订阅端的多个节点之间的负载均衡。共享订阅对于数据采集/集中处理类应用非常有用。在这样的场景下,数据的生产者远多余数据的消费者,且同一条数据只需要被任意消费者处理一次。


EMQ中本地/共享订阅特性


本地订阅(Local Subscription)是指只在本节点创建订阅与路由表,不会在集群节点间广播全局路由。


mosquitto_sub  -t  '$local/topic'

mosquitto_pub  -t  'topic'

使用方式: 订阅者在主题(Topic)前增加‘$local/’前缀。


共享订阅(Shared Subscription)支持在多订阅者间采用分组负载平衡方式派发消息:




共享订阅支持两种使用方式:


订阅前缀 使用示例

$queue/ mosquitto_sub -t ‘$queue/topic’

$share/<group>/ mosquitto_sub -t ‘$share/group/topic’

其中$queue与$share的区别,在于$share后面可以加不同的分组(group),比如$share/group1/topic,$share/group2/topic,$share/group3/topic,生产者发一个topic的消息,订阅$share/group1/topic的消费者、$share/group2/topic的消费者、$share/group3/topic的消费者都能收到消息,如果一个分组中存在多个消费者,则多个消费者还会共享订阅消息,每个group的消息随机其中一个消费者能获得。





使用场景


当生产者和消费者的消息只想通过一个EMQ节点,可使用EMQ的本地订阅。

当消费者的消息想通过EMQ实现订阅端的多个消费者之间的负载均衡,则可使用EMQ的共享订阅。

特殊情况下,当多个生产者生产多条消息的时候,一个消费者压力会很大,分担一个消费者的压力,或者其它情况,需要用到本地共享订阅。本地共享订阅是本地订阅与共享订阅的组合。


下图为本地共享订阅的消息走向和订阅关系。每个消费者都必须本地共享订阅每一个EMQ节点,这样每条消息只会走向一个消费者。注意:这里的SUB指消费者,PUB指生产者。


消息流向


每个PUB的消息从经过LB(负载均衡),发送到不同的EMQ节点中,比如第一个EMQ节点收到的M1、M3、M4的消息:


通过本地订阅,这个EMQ节点的消息把收到的M1、M3、M4的消息不发给其它集群的EMQ节点,只从这个EMQ节点发送给订阅的SUB。

通过共享订阅,这个EMQ的消息把收到的M1、M3、M4的消息发送给不同的SUB端。

通过上述本地订阅+共享订阅,实现多个PUB发送的每条消息随机到达一个并且只有一个SUB。

订阅关系


每个SUB使用本地共享订阅($local/$share/A)来订阅EMQ集群中的每个EMQ节点。




 


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
410 6
|
11月前
|
消息中间件 存储 数据采集
4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)
本文介绍了基于状态机驱动的MQTT客户端快速接入OneNet平台的实现方法,通过4步完成模块设计。文章以开源项目`Sparrow`为基础,引入`OneNetMqtt`业务模块,采用事件驱动模型和双层状态机设计,实现设备状态管理、消息处理及定时任务等功能。模块分为三层:`OneNetManager`负责核心逻辑,`OneNetDevice`管理设备信息,`OneNetDriver`处理Socket与MQTT通信。验证结果显示设备连接、数据上报及下线功能正常,稳定性良好。该设计简化了复杂条件判断,增强了系统灵活性与可扩展性,适用于实际项目参考。文末提供源码获取方式,助力读者实践与学习。
666 106
|
数据可视化 关系型数据库 MySQL
嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议数据可视化
通过本文的介绍,我们详细讲解了如何结合嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议,实现数据的采集、传输、存储和可视化。这种架构在物联网项目中非常常见,可以有效地处理和展示实时数据。希望本文能帮助您更好地理解和应用这些技术,构建高效、可靠的数据处理和可视化系统。
717 82
|
8月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
1670 0
|
11月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
280 1
|
7月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
5月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
374 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
959 89
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
444 88
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。