我的mqtt协议和emqttd开源项目个人理解(4) - 客户端CleanSession=0时,上线接收离线消息,源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(4) - 客户端CleanSession=0时,上线接收离线消息,源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


1、-module(emqttd_client).

[html] view plain copy

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->  

received(Bytes, State = #client_state{parser_fun  = ParserFun,  

                                     packet_opts = PacketOpts,  

                                     proto_state = ProtoState}) ->  


2、-module(emqttd_parser).

[html] view plain copy

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->  


3、-module(emqttd_protocol).

解析CONNECT消息

received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
process(Packet = ?CONNECT_PACKET(Var), State0) ->

4、-module(emqttd_sm).

start_session(CleanSess, ClientId) ->
handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) ->
%% Local node
resume_session(Session = #mqtt_session{client_id = ClientId,
                                       sess_pid  = SessPid}, ClientPid)
    when node(SessPid) =:= node() ->
%% Remote node
resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) ->

5、-module(emqttd_session).

dequeue(Session = #session{client_pid = undefined}) ->
    %% do nothing if client is disconnected
    Session;
dequeue(Session) ->
    case check_inflight(Session) of
        true  -> dequeue2(Session);
        false -> Session
    end.
dequeue2(Session = #session{message_queue = Q}) ->
    case emqttd_mqueue:out(Q) of
        {empty, _Q} ->
            Session;
        {{value, Msg}, Q1} ->
            %% dequeue more
            dequeue(deliver(Msg, Session#session{message_queue = Q1}))
    end.
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
    ClientPid ! {deliver, Msg}, Session; 
deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId,
                                                           client_pid = ClientPid,
                                                           inflight_queue = InflightQ})
    when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
    Msg1 = Msg#mqtt_message{pktid = PktId, dup = false},
    ClientPid ! {deliver, Msg1},
    await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).

6、-module(emqttd_mqueue).

out(MQ = #mqueue{type = simple, len = 0}) ->
    {empty, MQ};
out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
    {R, Q2} = queue:out(Q),
    {R, MQ#mqueue{q = Q2, len = Len - 1}};
out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
    {R, Q2} = queue:out(Q),
    {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
out(MQ = #mqueue{type = priority, q = Q}) ->
    {R, Q2} = priority_queue:out(Q),
    {R, MQ#mqueue{q = Q2}}.

7、-module(emqttd_client).

handle_info({deliver, Message}, State) ->  
    with_proto_state(fun(ProtoState) ->  
                       emqttd_protocol:send(Message, ProtoState)  
                     end, State);  

8、-module(emqttd_protocol).

send(Packet, State = #proto_state{sendfun = SendFun})  
    when is_record(Packet, mqtt_packet) ->  
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
293 6
|
10月前
|
数据可视化 关系型数据库 MySQL
嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议数据可视化
通过本文的介绍,我们详细讲解了如何结合嵌入式C++、STM32、MySQL、GPS、InfluxDB和MQTT协议,实现数据的采集、传输、存储和可视化。这种架构在物联网项目中非常常见,可以有效地处理和展示实时数据。希望本文能帮助您更好地理解和应用这些技术,构建高效、可靠的数据处理和可视化系统。
584 82
|
11月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ、Apache Seata 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。在评审出的 10 个年度开源项目中,Apache RocketMQ、Apache Seata 成功入选。
408 101
|
6月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
1106 0
|
9月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
230 1
|
数据采集 传感器 监控
多协议网关BL110钡铼6路RS485转MQTT协议云网关
BL110钡铼6路RS485转MQTT协议云网关是一款高性能、易配置的工业级设备,适用于各种需要远程监控和数据采集的物联网应用场景。通过将传统RS485设备的数据转换为MQTT协议并上传至云平台,实现了设备的远程管理和智能控制,极大地提升了系统的管理效率和响应速度。
437 2
|
5月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
3月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
241 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
884 93
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
395 93