我的mqtt协议和emqttd开源项目个人理解(4) - 客户端CleanSession=0时,上线接收离线消息,源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(4) - 客户端CleanSession=0时,上线接收离线消息,源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


1、-module(emqttd_client).

[html] view plain copy

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->  

received(Bytes, State = #client_state{parser_fun  = ParserFun,  

                                     packet_opts = PacketOpts,  

                                     proto_state = ProtoState}) ->  


2、-module(emqttd_parser).

[html] view plain copy

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->  


3、-module(emqttd_protocol).

解析CONNECT消息

received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
process(Packet = ?CONNECT_PACKET(Var), State0) ->

4、-module(emqttd_sm).

start_session(CleanSess, ClientId) ->
handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) ->
%% Local node
resume_session(Session = #mqtt_session{client_id = ClientId,
                                       sess_pid  = SessPid}, ClientPid)
    when node(SessPid) =:= node() ->
%% Remote node
resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) ->

5、-module(emqttd_session).

dequeue(Session = #session{client_pid = undefined}) ->
    %% do nothing if client is disconnected
    Session;
dequeue(Session) ->
    case check_inflight(Session) of
        true  -> dequeue2(Session);
        false -> Session
    end.
dequeue2(Session = #session{message_queue = Q}) ->
    case emqttd_mqueue:out(Q) of
        {empty, _Q} ->
            Session;
        {{value, Msg}, Q1} ->
            %% dequeue more
            dequeue(deliver(Msg, Session#session{message_queue = Q1}))
    end.
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
    ClientPid ! {deliver, Msg}, Session; 
deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId,
                                                           client_pid = ClientPid,
                                                           inflight_queue = InflightQ})
    when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
    Msg1 = Msg#mqtt_message{pktid = PktId, dup = false},
    ClientPid ! {deliver, Msg1},
    await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).

6、-module(emqttd_mqueue).

out(MQ = #mqueue{type = simple, len = 0}) ->
    {empty, MQ};
out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
    {R, Q2} = queue:out(Q),
    {R, MQ#mqueue{q = Q2, len = Len - 1}};
out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
    {R, Q2} = queue:out(Q),
    {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
out(MQ = #mqueue{type = priority, q = Q}) ->
    {R, Q2} = priority_queue:out(Q),
    {R, MQ#mqueue{q = Q2}}.

7、-module(emqttd_client).

handle_info({deliver, Message}, State) ->  
    with_proto_state(fun(ProtoState) ->  
                       emqttd_protocol:send(Message, ProtoState)  
                     end, State);  

8、-module(emqttd_protocol).

send(Packet, State = #proto_state{sendfun = SendFun})  
    when is_record(Packet, mqtt_packet) ->  
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
23
分享
相关文章
嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议数据可视化
通过本文的介绍,我们详细讲解了如何结合嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议,实现数据的采集、传输、存储和可视化。这种架构在物联网项目中非常常见,可以有效地处理和展示实时数据。希望本文能帮助您更好地理解和应用这些技术,构建高效、可靠的数据处理和可视化系统。
204 82
4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)
本文介绍了基于状态机驱动的MQTT客户端快速接入OneNet平台的实现方法,通过4步完成模块设计。文章以开源项目`Sparrow`为基础,引入`OneNetMqtt`业务模块,采用事件驱动模型和双层状态机设计,实现设备状态管理、消息处理及定时任务等功能。模块分为三层:`OneNetManager`负责核心逻辑,`OneNetDevice`管理设备信息,`OneNetDriver`处理Socket与MQTT通信。验证结果显示设备连接、数据上报及下线功能正常,稳定性良好。该设计简化了复杂条件判断,增强了系统灵活性与可扩展性,适用于实际项目参考。文末提供源码获取方式,助力读者实践与学习。
125 18
多协议网关BL110钡铼6路RS485转MQTT协议云网关
BL110钡铼6路RS485转MQTT协议云网关是一款高性能、易配置的工业级设备,适用于各种需要远程监控和数据采集的物联网应用场景。通过将传统RS485设备的数据转换为MQTT协议并上传至云平台,实现了设备的远程管理和智能控制,极大地提升了系统的管理效率和响应速度。
101 2
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
163 5
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
399 0
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
304 0
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
146 0
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
557 21
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
162 14
下一篇
oss创建bucket
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等