我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,
                                      packet_opts = PacketOpts,
                                      proto_state = ProtoState}) ->


2、-module(emqttd_parser).

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->


3、-module(emqttd_protocol).

解析SUBSCRIBE消息

process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
    Client = client(State),
    AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],
    case lists:member(deny, AllowDenies) of
        true ->
            ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
            send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
        false ->
            emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
    end;


4、-module(emqttd_session).

subscribe(SessPid, PacketId, TopicTable) ->
    From   = self(),
    AckFun = fun(GrantedQos) ->
               From ! {suback, PacketId, GrantedQos}
             end,
    gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,
                                                                 subscriptions = Subscriptions}) ->



5、-module(emqttd).

subscribe(ClientId, Topic, Qos) ->

   emqttd_server:subscribe(ClientId, Topic, Qos).


6、-module(emqttd_server).

subscribe(ClientId, Topic, Qos) ->
    From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
    pubsub_subscribe_(SubPid, Topic),
    if_subsciption(State, fun() ->
        add_subscription_(ClientId, Topic, Qos),
        set_subscription_stats()
    end),
pubsub_subscribe_(SubPid, Topic) ->
    case ets:match(subscribed, {SubPid, Topic}) of
        [] ->
            emqttd_pubsub:async_subscribe(Topic, SubPid),
            ets:insert(subscribed, {SubPid, Topic});
        [_] ->
            false
    end.
add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
    mnesia:dirty_write(subscription, Subscription).


7、-module(emqttd_pubsub).

把订阅的主题和节点名称存储在Mnesia数据库,这里考虑了集群的情况;

同时也把订阅主题和进程id存储在ETS表。

async_subscribe(Topic, SubPid) when is_binary(Topic) ->
    cast(pick(Topic), {subscribe, Topic, SubPid}).
handle_cast({subscribe, Topic, SubPid}, State) ->
    add_subscriber_(Topic, SubPid),
  {noreply, setstats(State)};
add_subscriber_(Topic, SubPid) ->
    case ets:member(subscriber, Topic) of
        false ->
            mnesia:transaction(fun add_topic_route_/2, [Topic, node()]),
            setstats(topic);
        true ->
            ok
    end,
    ets:insert(subscriber, {Topic, SubPid}).
add_topic_route_(Topic, Node) ->
    add_topic_(Topic), emqttd_router:add_route(Topic, Node).
add_topic_(Topic) ->
    add_topic_(Topic, []).
add_topic_(Topic, Flags) ->
    Record = #mqtt_topic{topic = Topic, flags = Flags},
    case mnesia:wread({topic, Topic}) of
        []  -> mnesia:write(topic, Record, write);
        [_] -> ok
    end.


8、-module(emqttd_router).

add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
    add_route(#mqtt_route{topic = Topic, node = Node}).
add_route_(Route = #mqtt_route{topic = Topic}) ->
    case mnesia:wread({route, Topic}) of
        [] ->
            case emqttd_topic:wildcard(Topic) of
                true  -> emqttd_trie:insert(Topic);
                false -> ok
            end,
            mnesia:write(route, Route, write);
        Records ->
            case lists:member(Route, Records) of
                true  -> ok;
                false -> mnesia:write(route, Route, write)
            end
    end.



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 运维 Serverless
商业版vs开源版:一图看懂云消息队列 RocketMQ 版核心优势
自建开源 RocketMQ 集群,为保证业务稳定性,往往需要按照业务请求的峰值去配置集群资源。云消息队列 RocketMQ 版 Serverless 实例通过资源快速伸缩,实现资源使用量与实际业务负载贴近,并按实际使用量计费,有效降低企业的运维压力和使用成本。
125 12
|
14天前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
53 6
|
14天前
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
29 0
|
2月前
|
消息中间件 弹性计算 运维
一图看懂云消息队列 RabbitMQ 版对比开源优势
一张图带您快速了解云消息队列 RabbitMQ 版对比开源版本的显著优势。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
存储 算法 安全
FreeMQTT:一款Python语言实现的开源MQTT Server
FreeMQTT 是一款用 Python 语言并基于 Tornado 开发的开源 MQTT 服务器,支持 MQTT3.1.1 和 MQTT5.0 协议,提供多租户安全隔离、高效 Topic 匹配算法及实时上下线通知等功能,适用于 IoT 场景。快速启动仅需克隆仓库、安装依赖并运行服务。
|
5月前
|
消息中间件 存储 传感器
RabbitMQ 在物联网 (IoT) 项目中的应用案例
【8月更文第28天】随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。
220 1
|
5月前
|
安全 网络性能优化
MQTT 客户端 MQTT.fx 使用说明
MQTT 客户端 MQTT.fx 使用说明
459 0
|
6月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
136 6