我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,
                                      packet_opts = PacketOpts,
                                      proto_state = ProtoState}) ->


2、-module(emqttd_parser).

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->


3、-module(emqttd_protocol).

解析SUBSCRIBE消息

process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
    Client = client(State),
    AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],
    case lists:member(deny, AllowDenies) of
        true ->
            ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
            send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
        false ->
            emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
    end;


4、-module(emqttd_session).

subscribe(SessPid, PacketId, TopicTable) ->
    From   = self(),
    AckFun = fun(GrantedQos) ->
               From ! {suback, PacketId, GrantedQos}
             end,
    gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,
                                                                 subscriptions = Subscriptions}) ->



5、-module(emqttd).

subscribe(ClientId, Topic, Qos) ->

   emqttd_server:subscribe(ClientId, Topic, Qos).


6、-module(emqttd_server).

subscribe(ClientId, Topic, Qos) ->
    From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
    pubsub_subscribe_(SubPid, Topic),
    if_subsciption(State, fun() ->
        add_subscription_(ClientId, Topic, Qos),
        set_subscription_stats()
    end),
pubsub_subscribe_(SubPid, Topic) ->
    case ets:match(subscribed, {SubPid, Topic}) of
        [] ->
            emqttd_pubsub:async_subscribe(Topic, SubPid),
            ets:insert(subscribed, {SubPid, Topic});
        [_] ->
            false
    end.
add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
    mnesia:dirty_write(subscription, Subscription).


7、-module(emqttd_pubsub).

把订阅的主题和节点名称存储在Mnesia数据库,这里考虑了集群的情况;

同时也把订阅主题和进程id存储在ETS表。

async_subscribe(Topic, SubPid) when is_binary(Topic) ->
    cast(pick(Topic), {subscribe, Topic, SubPid}).
handle_cast({subscribe, Topic, SubPid}, State) ->
    add_subscriber_(Topic, SubPid),
  {noreply, setstats(State)};
add_subscriber_(Topic, SubPid) ->
    case ets:member(subscriber, Topic) of
        false ->
            mnesia:transaction(fun add_topic_route_/2, [Topic, node()]),
            setstats(topic);
        true ->
            ok
    end,
    ets:insert(subscriber, {Topic, SubPid}).
add_topic_route_(Topic, Node) ->
    add_topic_(Topic), emqttd_router:add_route(Topic, Node).
add_topic_(Topic) ->
    add_topic_(Topic, []).
add_topic_(Topic, Flags) ->
    Record = #mqtt_topic{topic = Topic, flags = Flags},
    case mnesia:wread({topic, Topic}) of
        []  -> mnesia:write(topic, Record, write);
        [_] -> ok
    end.


8、-module(emqttd_router).

add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
    add_route(#mqtt_route{topic = Topic, node = Node}).
add_route_(Route = #mqtt_route{topic = Topic}) ->
    case mnesia:wread({route, Topic}) of
        [] ->
            case emqttd_topic:wildcard(Topic) of
                true  -> emqttd_trie:insert(Topic);
                false -> ok
            end,
            mnesia:write(route, Route, write);
        Records ->
            case lists:member(Route, Records) of
                true  -> ok;
                false -> mnesia:write(route, Route, write)
            end
    end.



相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 人工智能 Apache
2025 OSCAR丨与创新者同频!Apache RocketMQ 邀您共赴开源之约
10 月 28 日,阿里云高级技术专家周礼分享如何基于 Apache RocketMQ 新特性构建异步化 Multi-Agent 系统。
202 40
|
5月前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
413 6
|
7月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
1816 5
|
8月前
|
消息中间件 Apache 双11
Apache RocketMQ + “太乙” = 开源贡献新体验
Apache RocketMQ 是 Apache 顶级项目,源于阿里巴巴,历经多年双十一考验。RocketMQ 联合“太乙”平台启动开源竞赛,提供贡献价值评价与奖金激励(最高 5000 元),助力开发者成为社区核心成员。竞赛包含详尽教程与自动搭建环境,促进技术生态繁荣,推动分布式消息处理技术发展。欢迎加入,共创开源未来!
315 1
|
8月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
1678 0
|
11月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
282 1
|
7月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
5月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
380 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
965 88
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
448 88