我的mqtt协议和emqttd开源项目个人理解(3) - 客户端publish消息QoS==0的源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(3) - 客户端publish消息QoS==0的源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,
                                      packet_opts = PacketOpts,
                                      proto_state = ProtoState}) ->

2、-module(emqttd_parser).

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
received(Packet = ?PACKET(_Type), State) ->
process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
        #proto_state{client_id = ClientId, username = Username, session = Session}) ->

4、-module(emqttd_session).


5、-module(emqttd).

publish(Msg) when is_record(Msg, mqtt_message) ->


6、-module(emqttd_server).

publish(Msg = #mqtt_message{from = From}) ->


7、-module(emqttd_pubsub).

这里考虑了本机节点和集群节点的情况

publish(Topic, Msg) ->

   lists:foreach(

       fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->

           ?MODULE:dispatch(To, Msg);

          (#mqtt_route{topic = To, node = Node}) ->

           rpc:cast(Node, ?MODULE, dispatch, [To, Msg])

       end, emqttd_router:lookup(Topic)).



dispatch(Topic, Msg) ->

SubPid ! {dispatch, Topic, Msg};


8、-module(emqttd_session).

handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})

   when is_record(Msg, mqtt_message) ->

这里会区分Client是否在线离线,还有CleanSession=0/QoS=1,2的情况,需要存储离线消息。

%% Queue message if client disconnected

dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) ->

   hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});

%% Deliver qos0 message directly to client

dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->

   ClientPid ! {deliver, Msg},

   hibernate(Session);

dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})

   when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->

   case check_inflight(Session) of

       true  ->

           noreply(deliver(Msg, Session));

       false ->

           hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})

   end.


9、-module(emqttd_client).

handle_info({deliver, Message}, State) ->

   with_proto_state(fun(ProtoState) ->

                      emqttd_protocol:send(Message, ProtoState)

                    end, State);


10、-module(emqttd_protocol).

send(Packet, State = #proto_state{sendfun = SendFun})

   when is_record(Packet, mqtt_packet) ->




相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
482 6
|
消息中间件 存储 数据采集
4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)
本文介绍了基于状态机驱动的MQTT客户端快速接入OneNet平台的实现方法,通过4步完成模块设计。文章以开源项目`Sparrow`为基础,引入`OneNetMqtt`业务模块,采用事件驱动模型和双层状态机设计,实现设备状态管理、消息处理及定时任务等功能。模块分为三层:`OneNetManager`负责核心逻辑,`OneNetDevice`管理设备信息,`OneNetDriver`处理Socket与MQTT通信。验证结果显示设备连接、数据上报及下线功能正常,稳定性良好。该设计简化了复杂条件判断,增强了系统灵活性与可扩展性,适用于实际项目参考。文末提供源码获取方式,助力读者实践与学习。
717 112
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ、Apache Seata 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。在评审出的 10 个年度开源项目中,Apache RocketMQ、Apache Seata 成功入选。
522 118
|
数据可视化 关系型数据库 MySQL
嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议数据可视化
通过本文的介绍,我们详细讲解了如何结合嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议,实现数据的采集、传输、存储和可视化。这种架构在物联网项目中非常常见,可以有效地处理和展示实时数据。希望本文能帮助您更好地理解和应用这些技术,构建高效、可靠的数据处理和可视化系统。
781 82
|
10月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
2131 0
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
317 1
|
数据采集 传感器 监控
多协议网关BL110钡铼6路RS485转MQTT协议云网关
BL110钡铼6路RS485转MQTT协议云网关是一款高性能、易配置的工业级设备,适用于各种需要远程监控和数据采集的物联网应用场景。通过将传统RS485设备的数据转换为MQTT协议并上传至云平台,实现了设备的远程管理和智能控制,极大地提升了系统的管理效率和响应速度。
573 2
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
691 5
|
消息中间件 监控 物联网
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
1306 0
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
789 0
下一篇
开通oss服务