我的mqtt协议和emqttd开源项目个人理解(3) - 客户端publish消息QoS==0的源码分析

简介: 我的mqtt协议和emqttd开源项目个人理解(3) - 客户端publish消息QoS==0的源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,
                                      packet_opts = PacketOpts,
                                      proto_state = ProtoState}) ->

2、-module(emqttd_parser).

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
received(Packet = ?PACKET(_Type), State) ->
process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
        #proto_state{client_id = ClientId, username = Username, session = Session}) ->

4、-module(emqttd_session).


5、-module(emqttd).

publish(Msg) when is_record(Msg, mqtt_message) ->


6、-module(emqttd_server).

publish(Msg = #mqtt_message{from = From}) ->


7、-module(emqttd_pubsub).

这里考虑了本机节点和集群节点的情况

publish(Topic, Msg) ->

   lists:foreach(

       fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->

           ?MODULE:dispatch(To, Msg);

          (#mqtt_route{topic = To, node = Node}) ->

           rpc:cast(Node, ?MODULE, dispatch, [To, Msg])

       end, emqttd_router:lookup(Topic)).



dispatch(Topic, Msg) ->

SubPid ! {dispatch, Topic, Msg};


8、-module(emqttd_session).

handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})

   when is_record(Msg, mqtt_message) ->

这里会区分Client是否在线离线,还有CleanSession=0/QoS=1,2的情况,需要存储离线消息。

%% Queue message if client disconnected

dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) ->

   hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});

%% Deliver qos0 message directly to client

dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->

   ClientPid ! {deliver, Msg},

   hibernate(Session);

dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})

   when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->

   case check_inflight(Session) of

       true  ->

           noreply(deliver(Msg, Session));

       false ->

           hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})

   end.


9、-module(emqttd_client).

handle_info({deliver, Message}, State) ->

   with_proto_state(fun(ProtoState) ->

                      emqttd_protocol:send(Message, ProtoState)

                    end, State);


10、-module(emqttd_protocol).

send(Packet, State = #proto_state{sendfun = SendFun})

   when is_record(Packet, mqtt_packet) ->




相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
消息中间件 物联网 网络性能优化
MQTT常见问题之MQTT不支持5.0的协议如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
1天前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
2天前
|
传感器 网络协议 Ubuntu
MQTT协议与EMQ
MQTT协议与EMQ
|
2天前
|
消息中间件 负载均衡 应用服务中间件
MQ产品使用合集之使用的RocketMQ5.1.3时,grpc客户端没有产生消息轨迹如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
19 3
EMQ
|
3天前
|
JSON Linux 网络性能优化
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK
本文将介绍在 MQTT 中用于传递应用消息的 PUBLISH 报文以及它的响应报文。不管是客户端向服务端发布消息,还是服务端向订阅端转发消息,都需要使用 PUBLISH 报文。决定消息流向的主题、消息的实际内容和 QoS 等级,都包含在 PUBLISH 报文中。
EMQ
86 1
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK
|
3天前
|
监控 网络性能优化 网络安全
【MODBUS】Modbus主站为边缘设备通过MQTT协议上云
【MODBUS】Modbus主站为边缘设备通过MQTT协议上云
38 1
|
3天前
|
Java Maven
【开源视频联动物联网平台】vertx写一个mqtt客户端
【开源视频联动物联网平台】vertx写一个mqtt客户端
50 1
|
3天前
|
物联网 Linux 开发工具
MQTT协议接入问题之连接失败如何解决
MQTT接入是指将设备或应用通过MQTT协议接入到消息服务器,以实现数据的发布和订阅;本合集着眼于MQTT接入的流程、配置指导以及常见接入问题的解决方法,帮助用户实现稳定可靠的消息交换。
160 2
|
3天前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总: