学习mqtt协议和emqttd开源项目http://emqtt.com/
emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113
源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:
(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)
1、-module(emqttd_client).
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> received(Bytes, State = #client_state{parser_fun = ParserFun, packet_opts = PacketOpts, proto_state = ProtoState}) ->
2、-module(emqttd_parser).
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
3、-module(emqttd_protocol).
解析SUBSCRIBE消息
process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> Client = client(State), AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State), send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State} end;
4、-module(emqttd_session).
subscribe(SessPid, PacketId, TopicTable) -> From = self(), AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end, gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}). handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) ->
5、-module(emqttd).
subscribe(ClientId, Topic, Qos) ->
emqttd_server:subscribe(ClientId, Topic, Qos).
6、-module(emqttd_server).
subscribe(ClientId, Topic, Qos) -> From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}). handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> pubsub_subscribe_(SubPid, Topic), if_subsciption(State, fun() -> add_subscription_(ClientId, Topic, Qos), set_subscription_stats() end), pubsub_subscribe_(SubPid, Topic) -> case ets:match(subscribed, {SubPid, Topic}) of [] -> emqttd_pubsub:async_subscribe(Topic, SubPid), ets:insert(subscribed, {SubPid, Topic}); [_] -> false end. add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) -> mnesia:dirty_write(subscription, Subscription).
7、-module(emqttd_pubsub).
把订阅的主题和节点名称存储在Mnesia数据库,这里考虑了集群的情况;
同时也把订阅主题和进程id存储在ETS表。
async_subscribe(Topic, SubPid) when is_binary(Topic) -> cast(pick(Topic), {subscribe, Topic, SubPid}). handle_cast({subscribe, Topic, SubPid}, State) -> add_subscriber_(Topic, SubPid), {noreply, setstats(State)}; add_subscriber_(Topic, SubPid) -> case ets:member(subscriber, Topic) of false -> mnesia:transaction(fun add_topic_route_/2, [Topic, node()]), setstats(topic); true -> ok end, ets:insert(subscriber, {Topic, SubPid}). add_topic_route_(Topic, Node) -> add_topic_(Topic), emqttd_router:add_route(Topic, Node). add_topic_(Topic) -> add_topic_(Topic, []). add_topic_(Topic, Flags) -> Record = #mqtt_topic{topic = Topic, flags = Flags}, case mnesia:wread({topic, Topic}) of [] -> mnesia:write(topic, Record, write); [_] -> ok end.
8、-module(emqttd_router).
add_route(Topic, Node) when is_binary(Topic), is_atom(Node) -> add_route(#mqtt_route{topic = Topic, node = Node}). add_route_(Route = #mqtt_route{topic = Topic}) -> case mnesia:wread({route, Topic}) of [] -> case emqttd_topic:wildcard(Topic) of true -> emqttd_trie:insert(Topic); false -> ok end, mnesia:write(route, Route, write); Records -> case lists:member(Route, Records) of true -> ok; false -> mnesia:write(route, Route, write) end end.