我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,
                                      packet_opts = PacketOpts,
                                      proto_state = ProtoState}) ->


2、-module(emqttd_parser).

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->


3、-module(emqttd_protocol).

解析SUBSCRIBE消息

process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
    Client = client(State),
    AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],
    case lists:member(deny, AllowDenies) of
        true ->
            ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
            send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
        false ->
            emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
    end;


4、-module(emqttd_session).

subscribe(SessPid, PacketId, TopicTable) ->
    From   = self(),
    AckFun = fun(GrantedQos) ->
               From ! {suback, PacketId, GrantedQos}
             end,
    gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,
                                                                 subscriptions = Subscriptions}) ->



5、-module(emqttd).

subscribe(ClientId, Topic, Qos) ->

   emqttd_server:subscribe(ClientId, Topic, Qos).


6、-module(emqttd_server).

subscribe(ClientId, Topic, Qos) ->
    From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
    pubsub_subscribe_(SubPid, Topic),
    if_subsciption(State, fun() ->
        add_subscription_(ClientId, Topic, Qos),
        set_subscription_stats()
    end),
pubsub_subscribe_(SubPid, Topic) ->
    case ets:match(subscribed, {SubPid, Topic}) of
        [] ->
            emqttd_pubsub:async_subscribe(Topic, SubPid),
            ets:insert(subscribed, {SubPid, Topic});
        [_] ->
            false
    end.
add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
    mnesia:dirty_write(subscription, Subscription).


7、-module(emqttd_pubsub).

把订阅的主题和节点名称存储在Mnesia数据库,这里考虑了集群的情况;

同时也把订阅主题和进程id存储在ETS表。

async_subscribe(Topic, SubPid) when is_binary(Topic) ->
    cast(pick(Topic), {subscribe, Topic, SubPid}).
handle_cast({subscribe, Topic, SubPid}, State) ->
    add_subscriber_(Topic, SubPid),
  {noreply, setstats(State)};
add_subscriber_(Topic, SubPid) ->
    case ets:member(subscriber, Topic) of
        false ->
            mnesia:transaction(fun add_topic_route_/2, [Topic, node()]),
            setstats(topic);
        true ->
            ok
    end,
    ets:insert(subscriber, {Topic, SubPid}).
add_topic_route_(Topic, Node) ->
    add_topic_(Topic), emqttd_router:add_route(Topic, Node).
add_topic_(Topic) ->
    add_topic_(Topic, []).
add_topic_(Topic, Flags) ->
    Record = #mqtt_topic{topic = Topic, flags = Flags},
    case mnesia:wread({topic, Topic}) of
        []  -> mnesia:write(topic, Record, write);
        [_] -> ok
    end.


8、-module(emqttd_router).

add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
    add_route(#mqtt_route{topic = Topic, node = Node}).
add_route_(Route = #mqtt_route{topic = Topic}) ->
    case mnesia:wread({route, Topic}) of
        [] ->
            case emqttd_topic:wildcard(Topic) of
                true  -> emqttd_trie:insert(Topic);
                false -> ok
            end,
            mnesia:write(route, Route, write);
        Records ->
            case lists:member(Route, Records) of
                true  -> ok;
                false -> mnesia:write(route, Route, write)
            end
    end.



相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
监控 网络性能优化 网络安全
【MODBUS】Modbus主站为边缘设备通过MQTT协议上云
【MODBUS】Modbus主站为边缘设备通过MQTT协议上云
32 1
|
1月前
|
Java Maven
【开源视频联动物联网平台】vertx写一个mqtt客户端
【开源视频联动物联网平台】vertx写一个mqtt客户端
32 1
|
2月前
|
物联网 Linux 开发工具
MQTT协议接入问题之连接失败如何解决
MQTT接入是指将设备或应用通过MQTT协议接入到消息服务器,以实现数据的发布和订阅;本合集着眼于MQTT接入的流程、配置指导以及常见接入问题的解决方法,帮助用户实现稳定可靠的消息交换。
122 2
|
2月前
|
JSON 物联网 开发工具
MQTT协议问题之如何搭建物联网空调的服务器
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
77 1
|
2月前
|
JSON 网络协议 物联网
MQTT协议问题之消息类型分类如何解决
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
48 3
|
2月前
|
消息中间件 网络协议 物联网
MQTT协议问题之阿里云物联网服务器断开如何解决
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
125 1
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
4月前
|
消息中间件 NoSQL 数据库
一文讲透消息队列RocketMQ实现消费幂等
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。
一文讲透消息队列RocketMQ实现消费幂等
|
28天前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
73 0
|
3月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
44 0