MQTT源码分析
分析源码:mqttclient\test\emqx\test.c
1. MQTT客户端功能
MQTT通信模型示意图如下:
以"记者-电视台-观众"的模式来理解,客户端具体的流程是这样的:
- 客户端1:观众打电话到电视台:connect
- 客户端1:观众向电视台订阅"财经新闻": Subscribe 某个 Topic
- 客户端2:记者打电话到电视台:connect
- 客户端2:记者向电视台发布"财经新闻":Public某个Topic的某个Playload
- 服务器:电视台向"订阅了财经新闻的观众"发布"某条消息":Public某个Playload给Subscriber
整个过程中,电视台和记者、电视台和观众直接的电话要保存连接状态,还要时不时确认一下:
- 记者要时不时给电视台喊一声"喂":确保电视台还正常
- 观众要时不时给电视台喊一声"喂":确保电视台还正常
2. 客户端软件如何实现
- 连接服务器
- 订阅:
- 发布订阅请求,等待回应
- 循环:读取Publish信息(得到订阅的信息),处理
- 发布
- 发送数据包即可
- PING
- 循环:确保自己、对方还活着
- mqtt_packet_handle > mqtt_keep_alive
需要一个循环!
3. 程序分层
至少可以分为3层:
- 最上层:APP
- 中间层:MQTT
- 平台层:实现多线程、定时器、网卡收发数据
4. 情景分析
4.1 连接服务器
函数调用过程:
main client = mqtt_lease(); mqtt_set_port(client, "1883"); mqtt_set_host(client, "www.jiejie01.top"); mqtt_connect(client); mqtt_connect_with_results(c); rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL); rc = network_connect(c->mqtt_network); nettype_tcp_connect(n); platform_net_socket_connect
4.2 创建线程
调用过程:
main mqtt_connect(client); mqtt_connect_with_results(c); rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL); rc = network_connect(c->mqtt_network); /* send connect packet */ if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR) goto exit; if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) { } /* connect success, and need init mqtt thread */ c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread,c, ...);
4.3 发布消息
调用过程:
main res = pthread_create(&thread1, NULL, mqtt_publish_thread, client); mqtt_publish_thread mqtt_publish(client, "topic1", &msg); // 1. 构造消息 mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void *) buf; msg.payloadlen = xxx; mqtt_publish(client, "topic1", &msg); // 1.1 根据MQTT协议构造数据包 // 1.2 根据平台相关的函数发送数据包 mqtt_send_packet network_write nettype_tcp_write platform_net_socket_write_timeout
4.4 最复杂:订阅消息
消息何时到来?不知道!
所以,必定是某个内核线程不断查询网卡:
- 读网卡数据
- 得到数据的话就判断、处理