学习mqtt协议和emqttd开源项目http://emqtt.com/
emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113
源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:
(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)
1、-module(emqttd_client).
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> received(Bytes, State = #client_state{parser_fun = ParserFun, packet_opts = PacketOpts, proto_state = ProtoState}) ->
2、-module(emqttd_parser).
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
received(Packet = ?PACKET(_Type), State) -> process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), #proto_state{client_id = ClientId, username = Username, session = Session}) ->
4、-module(emqttd_session).
5、-module(emqttd).
publish(Msg) when is_record(Msg, mqtt_message) ->
6、-module(emqttd_server).
publish(Msg = #mqtt_message{from = From}) ->
7、-module(emqttd_pubsub).
这里考虑了本机节点和集群节点的情况
publish(Topic, Msg) ->
lists:foreach(
fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
?MODULE:dispatch(To, Msg);
(#mqtt_route{topic = To, node = Node}) ->
rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
end, emqttd_router:lookup(Topic)).
dispatch(Topic, Msg) ->
SubPid ! {dispatch, Topic, Msg};
8、-module(emqttd_session).
handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
when is_record(Msg, mqtt_message) ->
这里会区分Client是否在线离线,还有CleanSession=0/QoS=1,2的情况,需要存储离线消息。
%% Queue message if client disconnected
dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) ->
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
%% Deliver qos0 message directly to client
dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg},
hibernate(Session);
dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case check_inflight(Session) of
true ->
noreply(deliver(Msg, Session));
false ->
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end.
9、-module(emqttd_client).
handle_info({deliver, Message}, State) ->
with_proto_state(fun(ProtoState) ->
emqttd_protocol:send(Message, ProtoState)
end, State);
10、-module(emqttd_protocol).
send(Packet, State = #proto_state{sendfun = SendFun})
when is_record(Packet, mqtt_packet) ->