MQTT(Message Queuing Telemetry Transport),是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。MQTT是专门针对物联网开发的轻量级传输协议。MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化,使得其能适应各种物联网应用场景。
如今很多第三方推送平台都采用了MQTT来实现,消息中间件ActiveMQ的订阅/发布模块也是基于MQTT实现。
以下为MQTT的 会话,订阅,发布的几个报文的解析:
先看下这张图,为整体的报文结构。大致分为 固定头(Control+PacketLength)+可变头+载体
PS E:\test\mqtt> .\test.exe
test socket:
->connect ok!
//会话,CONNECT
->send:
102900044D51545404C2003C000A0102303330343035303600067075626C696300097061677075626C6963
<-recv:
20020000
<-recv ok!len = 4
//订阅主题
->send:
821C000100176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C00
<-recv:
9003000100
<-recv ok!len = 5
//订阅主题 .
->send:
821D000200186A6B2F636F6D6D616E642F72656164706172616D6574657200
<-recv:
9003000200
//发布消息
->send:
302200166A6B2F72657475726E2F7265616C79636F6E74726F6C61626331323938370000
//收到订阅的主题
<-recv:
301C00176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C998866
<-recv ok!len = 30
请按任意键继续. . .
PS E:\test\mqtt>
报文解析:
//CONNECT 报文
->send:
102900044D51545404C2003C000A0102303330343035303600067075626C696300097061677075626C6963
解析:
//固定报文头
10 //固定报文头 byte1
29 //固定报文头之 byte2
//可变报文头
0004//长度
4D515454 //内容(MQTT)
04 //协议级别
C2//连接标志位
003C //保持连接时长(60秒)
//载体部分
000A//长度
01023033303430353036//用户名
0006
7075626C6963
0009
7061677075626C6963 //密码
<-recv:
20020000
解析:
20 //固定头
02//固定头 byte2 长度
00 //连接确认标志
00 //连接返回码
//订阅主题报文
->send:
821C000100176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C00
解析:
82 //固定报文头 byte1
1C //固定报文头 byte2 (剩余长度)
//可变报文头
00//消息标识符byte1
01//消息标识符byte2
//载荷
0017//主题长度
6A6B2F636F6D6D616E642F7265616C79636F6E74726F6C // 内容为 : jk/command/realycontrol
00 //服务质量要求Qos
<-recv:
9003000100
应答解析:
90//固定报文头
03 //固定报文头 (长度)
0001 //报文标识符
00
<-recv ok!len = 5
//订阅主题报文
->send:
821D000200186A6B2F636F6D6D616E642F72656164706172616D6574657200
<-recv:
301C00176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C9988669003000200
<-recv ok!len = 35
//发送消息报文(固定头+可变头(主题长度+主题内容+标识符(可选)+载体内容))
->send:
302200166A6B2F72657475726E2F7265616C79636F6E74726F6C61626331323938370000
30 //固定头 (控制字)
22 //固定头(长度)
//可变头
0016
6A6B2F72657475726E2F7265616C79636F6E74726F6C //主题名内容为: jk/return/realycontrol
61626331323938370000 // 载体内容为: abc12987
只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中
/******************************************************************************* 函数名称:MqttConnectPacket 函数功能:按照MQTT协议发送建立连接数据包 输入参数:*packet,连接数据包缓存指针,*id:用户ID号指针,*username:用户名指针 *password:用户密码指针 输出参数:无 备注说明:用户ID是必需的,用户名和密码可以为空 ********************************************************************************/ u16 MqttConnectPacket(u8 *mqtt_message,char *client_id,char *username,char *password) { u16 client_id_length = strlen(client_id); u16 username_length = strlen(username); u16 password_length = strlen(password); u16 packetLen; u16 i,baseIndex; packetLen = 12 + 2 + client_id_length; if(username_length > 0) packetLen = packetLen + 2 + username_length; if(password_length > 0) packetLen = packetLen+ 2 + password_length; mqtt_message[0] = 16; //0x10 // MQTT Message Type CONNECT mqtt_message[1] = packetLen - 2; //剩余长度,不包括固定头 baseIndex = 2; if(packetLen > 127) { mqtt_message[2] = 1; //packetLen/127; baseIndex = 3; } mqtt_message[baseIndex++] = 0; // Protocol Name Length MSB mqtt_message[baseIndex++] = 4; // Protocol Name Length LSB mqtt_message[baseIndex++] = 77; // ASCII Code for M mqtt_message[baseIndex++] = 81; // ASCII Code for Q mqtt_message[baseIndex++] = 84; // ASCII Code for T mqtt_message[baseIndex++] = 84; // ASCII Code for T mqtt_message[baseIndex++] = 4; // MQTT Protocol version = 4 mqtt_message[baseIndex++] = 0xC2; // conn flags 需要用户名和密码认证 mqtt_message[baseIndex++] = 0; // Keep-alive Time Length MSB mqtt_message[baseIndex++] = 60; // Keep-alive Time Length LSB mqtt_message[baseIndex++] = (0xff00&client_id_length)>>8;// Client ID length MSB mqtt_message[baseIndex++] = 0xff&client_id_length; // Client ID length LSB // Client ID for(i = 0; i < client_id_length; i++) { mqtt_message[baseIndex + i] = client_id[i]; } baseIndex = baseIndex+client_id_length; if(username_length > 0) { //username mqtt_message[baseIndex++] = (0xff00&username_length)>>8;//username length MSB mqtt_message[baseIndex++] = 0xff&username_length; //username length LSB for(i = 0; i < username_length ; i++) { mqtt_message[baseIndex + i] = username[i]; } baseIndex = baseIndex + username_length; } if(password_length > 0) { //password mqtt_message[baseIndex++] = (0xff00&password_length)>>8;//password length MSB mqtt_message[baseIndex++] = 0xff&password_length; //password length LSB for(i = 0; i < password_length ; i++) { mqtt_message[baseIndex + i] = password[i]; } baseIndex += password_length; } return baseIndex; } /******************************************************************************* 函数名称:MqttPublishPacket 函数功能:按照MQTT协议构建MQTT发布消息包 输入参数:*mqtt_message,连接数据包缓存指针,*topic;消息主题 message:消息内容 message_ln:消息内容长度 qos:服务质量0、1、2 输出参数:无 备注说明: ********************************************************************************/ u16 MqttPublishPacket(u8 *mqtt_message, char * topic,char * message,u16 message_ln, u8 qos) { u16 topic_length = strlen(topic); u16 message_length = message_ln;//strlen(message); u16 i,index=0; u16 ln = 0; static u16 id=0; mqtt_message[index++] = 48; //0x30 // MQTT Message Type PUBLISH //20180927/// // if(qos) // mqtt_message[index++] = 2 + topic_length + 2 + message_length; // else // mqtt_message[index++] = 2 + topic_length + message_length; // Remaining length ///以上是老代码,以下是新代码。老代码只能发送小于127字节的数据长度 //新代码可以发送的长度为16383,即15K// ln = 2 + topic_length + message_length; if(ln <128) mqtt_message[index++] = ln; else if(ln < 16384) { mqtt_message[index++] = (0x80 |(ln%128)); mqtt_message[index++] = ln/128; } //20180927/// mqtt_message[index++] = (0xff00&topic_length)>>8; mqtt_message[index++] = 0xff&topic_length; // Topic for(i = 0; i < topic_length; i++) { mqtt_message[index + i] = topic[i]; } index += topic_length; if(qos) { mqtt_message[index++] = (0xff00&id)>>8; mqtt_message[index++] = 0xff&id; id++; } // Message for(i = 0; i < message_length; i++) { mqtt_message[index + i] = message[i]; } index += message_length; return index; } /******************************************************************************* 函数名称:MqttPublishAckPacket 函数功能:按照MQTT协议构建MQTT发布消息确认包 输入参数:*mqtt_message,连接数据包缓存指针,*topic message qos:服务质量0、1、2 输出参数:无 备注说明://对QoS级别1的 PUBLISH 消息的回应当服务器发送 PUBLISH 消息给订阅者客户端,客户端回复 PUBACK 消息 ********************************************************************************/ u8 MqttPublishAckPacket(u8 *mqtt_message) { static u16 id=0; mqtt_message[0] = 64; //0x40 //消息类型和标志 PUBACK mqtt_message[1] = 2; //剩余长度(不包括固定头部) mqtt_message[2] = (0xff00&id)>>8; //消息标识符 mqtt_message[3] = 0xff&id; //消息标识符 id++; return 4; } /******************************************************************************* 函数名称:MqtSubscribePacket 函数功能:按照MQTT协议构建MQTT订阅消息包 输入参数:*mqtt_message,连接数据包缓存指针,*topic;消息主题 message:消息内容 qos:服务质量0、1、2 输出参数:无 备注说明://whether=1,订阅; whether=0,取消订阅 ********************************************************************************/ u16 MqtSubscribePacket(u8 *mqtt_message,char *topic,u8 qos,u8 whether) { u16 topic_len = strlen(topic); u16 i,index = 0; static u16 id=0; id++; if(whether) mqtt_message[index++] = 130; //0x82 //消息类型和标志 SUBSCRIBE 订阅 else mqtt_message[index++] = 162; //0xA2 取消订阅 mqtt_message[index++] = topic_len + 5; //剩余长度(不包括固定头部) mqtt_message[index++] = (0xff00&id)>>8; //消息标识符 mqtt_message[index++] = 0xff&id; //消息标识符 mqtt_message[index++] = (0xff00&topic_len)>>8; //主题长度(高位在前,低位在后) mqtt_message[index++] = 0xff&topic_len; //主题长度 for (i = 0;i < topic_len; i++) { mqtt_message[index + i] = topic[i]; } index += topic_len; if(whether) { mqtt_message[index] = qos;//QoS级别 index++; } return index; } //构建MQTT PING请求包 u8 MqttPingPacket(u8 *mqtt_message) { mqtt_message[0] = 192; //0xC0 //消息类型和标志 PING mqtt_message[1] = 0; //剩余长度(不包括固定头部) return 2; } //构建MQTT断开连接包 u8 MqttDisconnectPacket(u8 *mqtt_message) { mqtt_message[0] = 224; //0xE0 //消息类型和标志 DISCONNECT mqtt_message[1] = 0; //剩余长度(不包括固定头部) return 2; }
测试demo:
int main() { int i,rcode,num = 0; U32 rxlen=0; U08 ip[8]= {47,95,221,93}; U16 port = 1883; U08 txbuf[1024]; U08 rxbuf[1024]; u8 *packet; u16 ln = 0; char str[100]; u8 deviceID[6] = {1,2,3,4,5,6}; u8 userID[6] = {0x31,0x32,0x33,0x34,0x35,0x36}; u8 companyID[6] ={0x31,0x32,0x33,0x34,0x35,0x36}; u8 txbuf1[20]={'a','b','c',0x31,0x32,0x39,0x38,0x37}; memset(txbuf,0,1024); memset(rxbuf,0,1024); printf("test socket:\r\n"); rcode = Com_Dev_Connect(ip,port,0,0); if(rcode == 0) { printf("->connect ok!\r\n"); } RefreshTopicParam(Mqtt.TopicParam,(char*)&userID,(char*)&companyID,(char*)&deviceID); packet = &Mqtt.TxBuf[0]; DeiveidTostr((char*)deviceID,str); ln = MqttConnectPacket(packet,str,"publ","pag"); rcode = Com_Dev_TxData(packet,ln,0,0); sleep(1); rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0); if(rcode == 0) { printf("<-recv ok!len = %d\r\n",rxlen); } system("pause"); //订阅各种消息 strcpy((char*)str,"jk/command/realycontrol");//订阅继电器控制命令主题 //strcat((char *)str,(char const *)Mqtt.TopicParam); packet = &Mqtt.TxBuf[0]; ln = MqtSubscribePacket(packet,(char*)&str,0,1); rcode = Com_Dev_TxData(packet,ln,0,0); sleep(1); rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0); if(rcode == 0) { printf("<-recv ok!len = %d\r\n",rxlen); } system("pause"); strcpy((char*)str,"jk/command/readparameter");//订阅读参数主题 //strcat((char *)str,(char const *)Mqtt.TopicParam); packet = &Mqtt.TxBuf[0]; ln = MqtSubscribePacket(packet,(char*)&str,0,1); rcode = Com_Dev_TxData(packet,ln,0,0); sleep(1); rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0); if(rcode == 0) { printf("<-recv ok!len = %d\r\n",rxlen); } system("pause"); //发布消息 strcpy((char*)str,"jk/return/realycontrol");// //strcat((char *)str,(char const *)Mqtt.TopicParam); num = 10; ln = MqttPublishPacket((u8*)&Mqtt.TxBuf[0],(char*)&str,(char*)&txbuf1,num,0);//继电器应答帧长度为10 rcode = Com_Dev_TxData(Mqtt.TxBuf,ln,0,0); sleep(1); rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0); if(rcode == 0) { printf("<-recv ok!len = %d\r\n",rxlen); } system("pause"); return 0; }
附带makefile:
######################################## #makefile ######################################## #编译主程序 BINARY := test OBJ_DIR := ./ CC= gcc LD= ld CFLAGS= -std=c99 -Wall -g LDSCRIPT= -lws2_32 LDFLAGS= SRC = $(wildcard *.c) DIR = $(notdir $(SRC)) OBJS = $(patsubst %.c,$(OBJ_DIR)%.o,$(DIR)) .PHONY: clean all: prebuild $(BINARY).exe prebuild: @echo Building app... $(BINARY).exe : $(OBJS) @echo Generating ... $(CC) -o $(BINARY).exe $(OBJS) $(LDFLAGS) $(LDSCRIPT) @echo OK! $(OBJ_DIR)%.o : %.c $(CC) -c $(CFLAGS) $< -o $@ clean: rm -f $(OBJ_DIR)*.o @echo Removed!