物联网MQTT协议报文解析(简单的c语音客户端实现)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 物联网MQTT协议报文解析(简单的c语音客户端实现)

MQTT(Message Queuing Telemetry Transport),是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。MQTT是专门针对物联网开发的轻量级传输协议。MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化,使得其能适应各种物联网应用场景。


如今很多第三方推送平台都采用了MQTT来实现,消息中间件ActiveMQ的订阅/发布模块也是基于MQTT实现。


以下为MQTT的 会话,订阅,发布的几个报文的解析:


先看下这张图,为整体的报文结构。大致分为 固定头(Control+PacketLength)+可变头+载体






PS E:\test\mqtt> .\test.exe


test socket:


->connect ok!


//会话,CONNECT


->send:


102900044D51545404C2003C000A0102303330343035303600067075626C696300097061677075626C6963


<-recv:


20020000


<-recv ok!len = 4


//订阅主题


->send:


821C000100176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C00


<-recv:


9003000100


<-recv ok!len = 5


//订阅主题 .


->send:


821D000200186A6B2F636F6D6D616E642F72656164706172616D6574657200


<-recv:


9003000200


//发布消息


->send:


302200166A6B2F72657475726E2F7265616C79636F6E74726F6C61626331323938370000


//收到订阅的主题


<-recv:


301C00176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C998866


<-recv ok!len = 30


请按任意键继续. . .


PS E:\test\mqtt>


报文解析:


//CONNECT  报文



->send:


102900044D51545404C2003C000A0102303330343035303600067075626C696300097061677075626C6963


解析:


//固定报文头


10 //固定报文头 byte1


29 //固定报文头之 byte2


//可变报文头


0004//长度


4D515454 //内容(MQTT)


04  //协议级别


C2//连接标志位


003C //保持连接时长(60秒)


//载体部分


000A//长度


01023033303430353036//用户名


0006


7075626C6963


0009


7061677075626C6963  //密码


<-recv:


20020000


解析:


20 //固定头


02//固定头 byte2 长度


00 //连接确认标志


00 //连接返回码



//订阅主题报文


->send:


821C000100176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C00


解析:


82  //固定报文头 byte1


1C //固定报文头 byte2 (剩余长度)


//可变报文头


00//消息标识符byte1


01//消息标识符byte2


//载荷


0017//主题长度


6A6B2F636F6D6D616E642F7265616C79636F6E74726F6C   // 内容为 : jk/command/realycontrol


00 //服务质量要求Qos


<-recv:


9003000100


应答解析:


90//固定报文头


03 //固定报文头 (长度)


0001 //报文标识符


00


<-recv ok!len = 5


//订阅主题报文


->send:


821D000200186A6B2F636F6D6D616E642F72656164706172616D6574657200


<-recv:


301C00176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C9988669003000200


<-recv ok!len = 35


//发送消息报文(固定头+可变头(主题长度+主题内容+标识符(可选)+载体内容))


->send:


302200166A6B2F72657475726E2F7265616C79636F6E74726F6C61626331323938370000


30  //固定头 (控制字)


22  //固定头(长度)


//可变头


0016


6A6B2F72657475726E2F7265616C79636F6E74726F6C   //主题名内容为: jk/return/realycontrol  


61626331323938370000  // 载体内容为: abc12987


只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中


/*******************************************************************************
函数名称:MqttConnectPacket
函数功能:按照MQTT协议发送建立连接数据包
输入参数:*packet,连接数据包缓存指针,*id:用户ID号指针,*username:用户名指针 *password:用户密码指针
输出参数:无
备注说明:用户ID是必需的,用户名和密码可以为空
********************************************************************************/
u16 MqttConnectPacket(u8 *mqtt_message,char *client_id,char *username,char *password)
{
  u16 client_id_length = strlen(client_id);
  u16 username_length = strlen(username);
  u16 password_length = strlen(password);
  u16 packetLen;
  u16 i,baseIndex;
  packetLen = 12 + 2 + client_id_length;
  if(username_length > 0)
    packetLen = packetLen + 2 + username_length;
  if(password_length > 0)
    packetLen = packetLen+ 2 + password_length;
  mqtt_message[0] = 16;       //0x10 // MQTT Message Type CONNECT
  mqtt_message[1] = packetLen - 2;  //剩余长度,不包括固定头
  baseIndex = 2;
  if(packetLen > 127)
  {
    mqtt_message[2] = 1;      //packetLen/127;    
    baseIndex = 3;
  }
  mqtt_message[baseIndex++] = 0;    // Protocol Name Length MSB    
  mqtt_message[baseIndex++] = 4;    // Protocol Name Length LSB    
  mqtt_message[baseIndex++] = 77;   // ASCII Code for M    
  mqtt_message[baseIndex++] = 81;   // ASCII Code for Q    
  mqtt_message[baseIndex++] = 84;   // ASCII Code for T    
  mqtt_message[baseIndex++] = 84;   // ASCII Code for T    
  mqtt_message[baseIndex++] = 4;    // MQTT Protocol version = 4    
  mqtt_message[baseIndex++] = 0xC2;   // conn flags 需要用户名和密码认证
  mqtt_message[baseIndex++] = 0;    // Keep-alive Time Length MSB    
  mqtt_message[baseIndex++] = 60;   // Keep-alive Time Length LSB    
  mqtt_message[baseIndex++] = (0xff00&client_id_length)>>8;// Client ID length MSB    
  mqtt_message[baseIndex++] = 0xff&client_id_length;  // Client ID length LSB    
  // Client ID
  for(i = 0; i < client_id_length; i++)
  {
    mqtt_message[baseIndex + i] = client_id[i];    
  }
  baseIndex = baseIndex+client_id_length;
  if(username_length > 0)
  {
    //username    
    mqtt_message[baseIndex++] = (0xff00&username_length)>>8;//username length MSB    
    mqtt_message[baseIndex++] = 0xff&username_length; //username length LSB    
    for(i = 0; i < username_length ; i++)
    {
      mqtt_message[baseIndex + i] = username[i];    
    }
    baseIndex = baseIndex + username_length;
  }
  if(password_length > 0)
  {
    //password    
    mqtt_message[baseIndex++] = (0xff00&password_length)>>8;//password length MSB    
    mqtt_message[baseIndex++] = 0xff&password_length; //password length LSB    
    for(i = 0; i < password_length ; i++)
    {
      mqtt_message[baseIndex + i] = password[i];    
    }
    baseIndex += password_length; 
  } 
  return baseIndex;    
}
/*******************************************************************************
函数名称:MqttPublishPacket
函数功能:按照MQTT协议构建MQTT发布消息包
输入参数:*mqtt_message,连接数据包缓存指针,*topic;消息主题  message:消息内容  message_ln:消息内容长度 qos:服务质量0、1、2
输出参数:无
备注说明:
********************************************************************************/
u16 MqttPublishPacket(u8 *mqtt_message, char * topic,char * message,u16 message_ln, u8 qos)
{  
  u16 topic_length = strlen(topic);    
  u16 message_length = message_ln;//strlen(message);  
  u16 i,index=0;  
  u16 ln = 0;
  static u16 id=0;
  mqtt_message[index++] = 48; //0x30 // MQTT Message Type PUBLISH    
//20180927/// 
//  if(qos)
//    mqtt_message[index++] = 2 + topic_length + 2 + message_length;
//  else
//    mqtt_message[index++] = 2 + topic_length + message_length;   // Remaining length   
  ///以上是老代码,以下是新代码。老代码只能发送小于127字节的数据长度
  //新代码可以发送的长度为16383,即15K//
  ln = 2 + topic_length + message_length;
  if(ln <128)
    mqtt_message[index++] = ln;
  else if(ln < 16384)
  {
      mqtt_message[index++] = (0x80 |(ln%128)); 
    mqtt_message[index++] = ln/128;
  }
//20180927///  
  mqtt_message[index++] = (0xff00&topic_length)>>8;
  mqtt_message[index++] = 0xff&topic_length;
  // Topic    
  for(i = 0; i < topic_length; i++)
  {
    mqtt_message[index + i] = topic[i];
  }
  index += topic_length;
  if(qos)
  {
    mqtt_message[index++] = (0xff00&id)>>8;
    mqtt_message[index++] = 0xff&id;
    id++;
  }
  // Message
  for(i = 0; i < message_length; i++)
  {
    mqtt_message[index + i] = message[i];
  }
  index += message_length;
  return index;
}
/*******************************************************************************
函数名称:MqttPublishAckPacket
函数功能:按照MQTT协议构建MQTT发布消息确认包
输入参数:*mqtt_message,连接数据包缓存指针,*topic  message   qos:服务质量0、1、2
输出参数:无
备注说明://对QoS级别1的 PUBLISH 消息的回应当服务器发送 PUBLISH 消息给订阅者客户端,客户端回复 PUBACK 消息
********************************************************************************/
u8 MqttPublishAckPacket(u8 *mqtt_message)
{
  static u16 id=0;
  mqtt_message[0] = 64;       //0x40 //消息类型和标志 PUBACK
  mqtt_message[1] = 2;        //剩余长度(不包括固定头部)
  mqtt_message[2] = (0xff00&id)>>8; //消息标识符
  mqtt_message[3] = 0xff&id;      //消息标识符
  id++;
  return 4;
}
/*******************************************************************************
函数名称:MqtSubscribePacket
函数功能:按照MQTT协议构建MQTT订阅消息包
输入参数:*mqtt_message,连接数据包缓存指针,*topic;消息主题  message:消息内容   qos:服务质量0、1、2
输出参数:无
备注说明://whether=1,订阅; whether=0,取消订阅
********************************************************************************/ 
u16 MqtSubscribePacket(u8 *mqtt_message,char *topic,u8 qos,u8 whether)
{    
  u16 topic_len = strlen(topic);
  u16 i,index = 0;
  static u16 id=0;
  id++;
  if(whether)
    mqtt_message[index++] = 130;        //0x82 //消息类型和标志 SUBSCRIBE 订阅
  else
    mqtt_message[index++] = 162;        //0xA2 取消订阅
  mqtt_message[index++] = topic_len + 5;      //剩余长度(不包括固定头部)
  mqtt_message[index++] = (0xff00&id)>>8;     //消息标识符
  mqtt_message[index++] = 0xff&id;        //消息标识符
  mqtt_message[index++] = (0xff00&topic_len)>>8;  //主题长度(高位在前,低位在后)
  mqtt_message[index++] = 0xff&topic_len;     //主题长度 
  for (i = 0;i < topic_len; i++)
  {
    mqtt_message[index + i] = topic[i];
  }
  index += topic_len;
  if(whether)
  {
    mqtt_message[index] = qos;//QoS级别
    index++;
  }
  return index;
}
//构建MQTT PING请求包
u8 MqttPingPacket(u8 *mqtt_message)
{ 
  mqtt_message[0] = 192;  //0xC0 //消息类型和标志 PING
  mqtt_message[1] = 0;  //剩余长度(不包括固定头部)
  return 2;
}
//构建MQTT断开连接包
u8 MqttDisconnectPacket(u8 *mqtt_message)
{ 
  mqtt_message[0] = 224;  //0xE0 //消息类型和标志 DISCONNECT
  mqtt_message[1] = 0;  //剩余长度(不包括固定头部)
  return 2;
} 


测试demo:


int main()
{
      int i,rcode,num = 0;
      U32 rxlen=0;
        U08 ip[8]= {47,95,221,93};
        U16 port = 1883;
        U08 txbuf[1024];
        U08 rxbuf[1024];
    u8  *packet;
    u16 ln = 0;
    char  str[100];
    u8 deviceID[6] = {1,2,3,4,5,6};
    u8 userID[6] = {0x31,0x32,0x33,0x34,0x35,0x36};
    u8 companyID[6] ={0x31,0x32,0x33,0x34,0x35,0x36};
    u8 txbuf1[20]={'a','b','c',0x31,0x32,0x39,0x38,0x37};
    memset(txbuf,0,1024);
        memset(rxbuf,0,1024);
        printf("test socket:\r\n");
        rcode = Com_Dev_Connect(ip,port,0,0);
        if(rcode == 0)
        {
          printf("->connect ok!\r\n");  
        }
    RefreshTopicParam(Mqtt.TopicParam,(char*)&userID,(char*)&companyID,(char*)&deviceID);
    packet = &Mqtt.TxBuf[0];     
    DeiveidTostr((char*)deviceID,str);
    ln = MqttConnectPacket(packet,str,"publ","pag");
        rcode = Com_Dev_TxData(packet,ln,0,0);
    sleep(1); 
    rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
    if(rcode == 0)
    {
       printf("<-recv ok!len = %d\r\n",rxlen);  
    }
    system("pause");
    //订阅各种消息
    strcpy((char*)str,"jk/command/realycontrol");//订阅继电器控制命令主题
    //strcat((char *)str,(char const *)Mqtt.TopicParam);  
    packet = &Mqtt.TxBuf[0];
    ln = MqtSubscribePacket(packet,(char*)&str,0,1);
    rcode = Com_Dev_TxData(packet,ln,0,0);
    sleep(1); 
    rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
    if(rcode == 0)
    {
       printf("<-recv ok!len = %d\r\n",rxlen);  
    }
    system("pause");
    strcpy((char*)str,"jk/command/readparameter");//订阅读参数主题
    //strcat((char *)str,(char const *)Mqtt.TopicParam);  
    packet = &Mqtt.TxBuf[0];
    ln = MqtSubscribePacket(packet,(char*)&str,0,1);
    rcode = Com_Dev_TxData(packet,ln,0,0);
    sleep(1); 
    rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
    if(rcode == 0)
    {
       printf("<-recv ok!len = %d\r\n",rxlen);  
    }
    system("pause");
    //发布消息
    strcpy((char*)str,"jk/return/realycontrol");//
    //strcat((char *)str,(char const *)Mqtt.TopicParam);    
    num = 10;
    ln = MqttPublishPacket((u8*)&Mqtt.TxBuf[0],(char*)&str,(char*)&txbuf1,num,0);//继电器应答帧长度为10
    rcode = Com_Dev_TxData(Mqtt.TxBuf,ln,0,0);
    sleep(1); 
    rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
    if(rcode == 0)
    {
       printf("<-recv ok!len = %d\r\n",rxlen);  
    }
    system("pause");
  return 0;
}


附带makefile:



########################################
#makefile
########################################
#编译主程序
BINARY  := test
OBJ_DIR := ./
CC= gcc
LD= ld
CFLAGS= -std=c99 -Wall -g
LDSCRIPT= -lws2_32
LDFLAGS= 
SRC  = $(wildcard *.c)
DIR  = $(notdir $(SRC))
OBJS = $(patsubst %.c,$(OBJ_DIR)%.o,$(DIR))
.PHONY: clean 
all:  prebuild  $(BINARY).exe
prebuild:
  @echo Building app...
$(BINARY).exe : $(OBJS)
  @echo Generating ...
  $(CC) -o $(BINARY).exe $(OBJS) $(LDFLAGS) $(LDSCRIPT) 
  @echo OK!
$(OBJ_DIR)%.o : %.c
  $(CC) -c $(CFLAGS) $< -o  $@  
clean:
  rm -f $(OBJ_DIR)*.o
  @echo Removed!
相关文章
|
3月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
92 4
|
1月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
41 3
|
1月前
|
传感器 消息中间件 物联网
常用的物联网协议
常用的物联网协议包括:MQTT(消息队列遥测传输)、CoAP(受限应用协议)、HTTP/HTTPS、LWM2M(轻量级机器对机器)和Zigbee等。这些协议在不同的应用场景中发挥着重要作用,如数据传输、设备管理等。
|
2月前
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
72 5
|
1月前
|
消息中间件 监控 物联网
物联网8大协议介绍及对比
根据具体的应用需求,选择合适的协议可以大幅提升系统的性能和可靠性。希望本文能为您在物联网协议的选择和应用中提供有价值的参考。
279 0
|
2月前
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
|
3月前
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【9月更文挑战第3天】物联网(IoT)的兴起催生了多种通信协议,如MQTT、CoAP、RESTful/HTTP和XMPP,各自适用于不同场景。本文将对比这些协议的特点、优缺点,并提供示例代码。MQTT轻量级且支持QoS,适合大规模部署;CoAP基于UDP,适用于低功耗网络;RESTful/HTTP易于集成但不适合资源受限设备;XMPP支持双向通信,适合复杂交互应用。通过本文,开发者可更好地选择合适的物联网通信协议。
47 2
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
83 2
|
4月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
152 0
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
73 0

推荐镜像

更多