MQTT源码分析

简介: MQTT源码分析

MQTT源码分析

分析源码:mqttclient\test\emqx\test.c

1. MQTT客户端功能

MQTT通信模型示意图如下:

以"记者-电视台-观众"的模式来理解,客户端具体的流程是这样的:


  • 客户端1:观众打电话到电视台:connect
  • 客户端1:观众向电视台订阅"财经新闻": Subscribe 某个 Topic
  • 客户端2:记者打电话到电视台:connect
  • 客户端2:记者向电视台发布"财经新闻":Public某个Topic的某个Playload
  • 服务器:电视台向"订阅了财经新闻的观众"发布"某条消息":Public某个Playload给Subscriber


整个过程中,电视台和记者、电视台和观众直接的电话要保存连接状态,还要时不时确认一下:


  • 记者要时不时给电视台喊一声"喂":确保电视台还正常
  • 观众要时不时给电视台喊一声"喂":确保电视台还正常

2. 客户端软件如何实现

  • 连接服务器
  • 订阅:
  • 发布订阅请求,等待回应
  • 循环:读取Publish信息(得到订阅的信息),处理
  • 发布
  • 发送数据包即可
  • PING
  • 循环:确保自己、对方还活着
  • mqtt_packet_handle > mqtt_keep_alive

需要一个循环!

3. 程序分层

至少可以分为3层:

  • 最上层:APP
  • 中间层:MQTT
  • 平台层:实现多线程、定时器、网卡收发数据

4. 情景分析

4.1 连接服务器

函数调用过程:

main
    client = mqtt_lease();
    mqtt_set_port(client, "1883");
    mqtt_set_host(client, "www.jiejie01.top");
    mqtt_connect(client);
        mqtt_connect_with_results(c);
            rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
            rc = network_connect(c->mqtt_network);
                    nettype_tcp_connect(n); 
                        platform_net_socket_connect

4.2 创建线程

调用过程:

main
    mqtt_connect(client);
        mqtt_connect_with_results(c);
            rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
            rc = network_connect(c->mqtt_network);
            /* send connect packet */
            if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
                goto exit;
            if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
            }
            /* connect success, and need init mqtt thread */
            c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread,c, ...);

4.3 发布消息

调用过程:

main
    res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
                mqtt_publish_thread
                    mqtt_publish(client, "topic1", &msg);
// 1. 构造消息
mqtt_message_t msg;
memset(&msg, 0, sizeof(msg));
msg.payload = (void *) buf;
msg.payloadlen = xxx;
mqtt_publish(client, "topic1", &msg);
    // 1.1 根据MQTT协议构造数据包
    // 1.2 根据平台相关的函数发送数据包
    mqtt_send_packet
        network_write
            nettype_tcp_write
                platform_net_socket_write_timeout

4.4 最复杂:订阅消息

消息何时到来?不知道!

所以,必定是某个内核线程不断查询网卡:

  • 读网卡数据
  • 得到数据的话就判断、处理
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
消息中间件 负载均衡 中间件
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
152 1
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
|
9月前
|
消息中间件 中间件 Kafka
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
**RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。
165 1
|
10月前
|
消息中间件 存储 负载均衡
RocketMQ 源码分析——NameServer
- 编写优雅、高效的代码。RocketMQ作为阿里双十一交易核心链路产品,支撑千万级并发、万亿级数据洪峰。读源码可以积累编写高效、优雅代码的经验。 - 提升微观的架构设计能力,重点在思维和理念。Apache RocketMQ作为Apache顶级项目,它的架构设计是值得大家借鉴的。 - 解决工作中、学习中的各种疑难杂症。在使用RocketMQ过程中遇到消费卡死、卡顿等问题可以通过阅读源码的方式找到问题并给予解决。 - 在BATJ一线互联网公司面试中展现优秀的自己。大厂面试中,尤其是阿里系的公司,你有RocketMQ源码体系化知识,必定是一个很大的加分项。
153 0
|
10月前
|
消息中间件 存储 Kafka
RocketMQ 源码分析——Broker
1. Broker启动流程分析 2. 消息存储设计 3. 消息写入流程 4. 亮点分析:NRS与NRC的功能号设计 5. 亮点分析:同步双写数倍性能提升的CompletableFuture 6. 亮点分析:Commitlog写入时使用可重入锁还是自旋锁? 7. 亮点分析:零拷贝技术之MMAP提升文件读写性能 8. 亮点分析:堆外内存机制
150 0
|
消息中间件 运维 监控
RocketMq-dashboard:topic 5min trend 原理和源码分析(一)
RocketMq-dashboard:topic 5min trend 原理和源码分析(一)
286 0
|
消息中间件 存储 负载均衡
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析
169 1
|
消息中间件 存储 Java
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
210 0
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
|
消息中间件 存储 监控
搭建源码调试环境—RocketMQ源码分析(一)
在正式开始搭建调试环境之前,我们先了解一下RockeMQ源码的整体架构。 这是因为掌握了整体架构,可以让我们迅速了解各个方面的特性,并且可以方便我们后续快速定位功能模块对应的代码文件。话不多说,我们开始看RocketMQ目录结构。
297 0
搭建源码调试环境—RocketMQ源码分析(一)
|
安全 Java Android开发
Eclipse Paho MQTT客户端Java源码分析
Eclipse Paho MQTT客户端Java源码分析
392 0
Eclipse Paho MQTT客户端Java源码分析