我的mqtt协议和emqttd开源项目个人理解(3) - 客户端publish消息QoS==0的源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(3) - 客户端publish消息QoS==0的源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,
                                      packet_opts = PacketOpts,
                                      proto_state = ProtoState}) ->

2、-module(emqttd_parser).

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
received(Packet = ?PACKET(_Type), State) ->
process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
        #proto_state{client_id = ClientId, username = Username, session = Session}) ->

4、-module(emqttd_session).


5、-module(emqttd).

publish(Msg) when is_record(Msg, mqtt_message) ->


6、-module(emqttd_server).

publish(Msg = #mqtt_message{from = From}) ->


7、-module(emqttd_pubsub).

这里考虑了本机节点和集群节点的情况

publish(Topic, Msg) ->

   lists:foreach(

       fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->

           ?MODULE:dispatch(To, Msg);

          (#mqtt_route{topic = To, node = Node}) ->

           rpc:cast(Node, ?MODULE, dispatch, [To, Msg])

       end, emqttd_router:lookup(Topic)).



dispatch(Topic, Msg) ->

SubPid ! {dispatch, Topic, Msg};


8、-module(emqttd_session).

handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})

   when is_record(Msg, mqtt_message) ->

这里会区分Client是否在线离线,还有CleanSession=0/QoS=1,2的情况,需要存储离线消息。

%% Queue message if client disconnected

dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) ->

   hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});

%% Deliver qos0 message directly to client

dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->

   ClientPid ! {deliver, Msg},

   hibernate(Session);

dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})

   when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->

   case check_inflight(Session) of

       true  ->

           noreply(deliver(Msg, Session));

       false ->

           hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})

   end.


9、-module(emqttd_client).

handle_info({deliver, Message}, State) ->

   with_proto_state(fun(ProtoState) ->

                      emqttd_protocol:send(Message, ProtoState)

                    end, State);


10、-module(emqttd_protocol).

send(Packet, State = #proto_state{sendfun = SendFun})

   when is_record(Packet, mqtt_packet) ->




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8天前
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
26 5
|
2月前
|
消息中间件 监控 物联网
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
146 0
|
3月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
88 0
|
3月前
|
物联网 网络性能优化 Python
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
65 0
|
1天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
5天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
6天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
25 4
|
12天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
15天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
51 6
|
19天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
58 4

热门文章

最新文章