基于MQTT.fx的ESP8266主题发布订阅

简介: 本篇文章主要以ESP8266-12E作为开发板,带你了解MQTT发布、订阅、取消订阅的基础知识。

发布、订阅和取消订阅

理论知识

==PUBLISH – 发布消息==

  • 这里主要介绍客户端PUBLISH报文中的详细信息:

    名称 内容 说明
    packetId 4314 报文标识符。报文的ID编号
    topicName ”weather“ 报文所属主题
    qos 1 服务质量等级。有0 1 2三个等级
    retainFlag false 保留标志。决定了当有新的客户端订阅该主题时,能否收到该主题中之前保留的内容
    payload "sunny" 有效载荷。即该客户端向服务器端发送的报文内容
    dupFlag false 重发标志。若接收方没接到报文,且dupFlag==true时,该报文将被重发

    注意

    1. 报文标识符packetId与QoS级别有密不可分的关系。只有QoS级别大于0时,报文标识符才是非零数值。如果QoS等于0,报文标识符为0。
    2. 重发标志dupFlag只在QoS级别大于0时使用

==SUBSCRIBE – 订阅主题==

  • 客户端订阅主题时,一个报文中可以包含有单个或者多个订阅主题名
  • 客户端在订阅主题时也可以明确QoS

==UNSUBSCRIBE – 取消订阅==

  • 客户端取消订阅时,一个报文中可以包含有单个或者多个订阅主题名

==SUBACK – 订阅确认==

  • SUBACK报文是服务器端向客户端发送的报文
  • SUBACK报文包括两个信息

    名称 内容 说明
    packetId 4314 报文标识符。报文的ID编号报文标识符
    returnCode 0 订阅成功 – QoS 0
    1 订阅成功 – QoS 1
    2 订阅成功 – QoS 2
    128 订阅失败
    订阅返回码。
程序
开发板发布MQTT主题

MQTT.fx软件页面介绍

  • 这里以MQTT.fx1.7.1版本作为示例进行介绍
  • ==Publish页面==

    image-20221224173621939
    至于如何建立这个profile请参考[这个教程]()

  • ==Subscribe页面==
    image-20221224174014661
  • 开发板发布MQTT主题

    /*
    # 程序目的:实现ESP8266向 然也 物联网服务器发布订阅
    # 创建时间:2022-12-25
    # 函数:
    ## wifi对象:WiFi.SSID();WiFi.localIP(); WiFi.macAddress(); WiFi.mode(WIFI_STA);
    ## mqttClient对象:mqttClient.setServer(网站, 端口号); mqttClient.connected(); mqttClient.loop();                             mqttClient.connect(订阅者的ID.c_str()); mqttClient.state();mqttClient.publish(主题名数组,内容                    数组)
    ## Ticker对象:ticker.attach(即使周期(s),执行函数名);
    # 程序思路:串口初始化->连接wifi->设置MQTT服务器和端口号->连接MQTT服务器(自建函数)->开启定时->loop函数中检查是否连接->             发布信息->保持客户端心跳
    # 难点:变量类型间的转换
    # 注意:需要把WiFi_Connect()函数中的WiFi名称和密码换成自己的。否则连接不上
    */    
    #include <ESP8266WiFi.h>
    #include <ESP8266WiFiMulti.h>
    #include <PubSubClient.h>
    #include <Ticker.h>
    
    //创建对象
    ESP8266WiFiMulti My_WifiMulti;
    WiFiClient My_WiFiClient;
    PubSubClient mqttClient(My_WiFiClient);
    Ticker My_ticker;
    
    //定义常量
    const char* mqttServerSite = "test.ranye-iot.net";
    
    //定义变量
    int count = 0;  //计时
    
    //函数申明
    int WiFi_Connect();
    void timeCounter();
    void My_connectMQTTServer();
    void pubMQTTmsg();
    
    void setup() {
      //串口初始化
      Serial.begin(9600);
    
      //WiFi连接
      WiFi_Connect();
    
      //设置MQTT服务器网站地址和端口号
      mqttClient.setServer(mqttServerSite, 1883);
    
      //连接MQTT服务器
      My_connectMQTTServer();
    
      //开启定时
      // ticker.attach(即使周期(单位:s),执行函数);
      My_ticker.attach(1,timeCounter); //注意:这里不是调用计数函数,只要写函数名即可
    
    }
    
    void loop()
    {
      if(mqttClient.connected())//如果连接成功且间隔3秒:发布信息 
      {
        if(count>=3)
        {
          pubMQTTmsg();
          count = 0;
        }
        mqttClient.loop();
      }
      else //不成功重新尝试连接
      {
        My_connectMQTTServer();
      }
    }
    
    /*
    wifi连接函数
    需引 ESP8266WiFiMulti.h 库  并建立ESP8266WiFiMulti对象
    */
    int WiFi_Connect() {
      My_WifiMulti.addAP("TPLINK2.4G", "@@@@@@@@");  // Wifi1
      My_WifiMulti.addAP("username2", "password");   // Wifi2
      My_WifiMulti.addAP("username3", "password");   // Wifi3
    
      int i = 0;
      Serial.print("\n-------------Connected Time:-------------\n");
      while (My_WifiMulti.run() != WL_CONNECTED) {
        i += 1;
        Serial.print(i);
        Serial.println("->");
        delay(1000);
        if (i > 15) {
          Serial.print("\n-------------WIFI connected failed!-------------\n");
          return 0;
        }
      }
    
      Serial.println("\n-------------WIFI connected successful!-------------\n");
      Serial.println("\n-------------WIFI Name:-------------");
      Serial.println(WiFi.SSID());
      Serial.println("\n-------------ESP8266 IP address:-------------");
      Serial.println(WiFi.localIP());
      Serial.println("\n-------------------------------------------");
      return 1;
    }
    
    /*
    客户端连接服务器端函数
    注意:需要提前建立mqttClient对象
    */
    void My_connectMQTTServer()
    {
      // 根据ESP8266的MAC地址生成客户端ID(避免与其它ESP8266的客户端ID重名)
      String clientID="ESP8266-" + WiFi.macAddress();//获取ESP8266的MAC地址
      //连接MQTT服务器
      if(mqttClient.connect(clientID.c_str()))//注意:这里的String.c_str是对字符串的处理
      {
        Serial.println("MQTT Server Connected.");
        Serial.print("Server Address: "); Serial.println(mqttServerSite);
        Serial.print("ClientId:"); Serial.println(clientID);
        Serial.print("Client State:"); Serial.println(mqttClient.state());
      }
      else
      {
        Serial.print("MQTT Server Connect Failed. Client State:");
        Serial.println(mqttClient.state());
        delay(3000);
      }
    }
    
    //发布信息
    void pubMQTTmsg()
    {
    /* —----------------------主题名称编辑操作------------------------ */
      //发布主题的名称 字符串型
      String topicString = "PUBLIC-topicName-" + WiFi.macAddress();
    
      //转换成char数组类型(mqttClient.public传参需要)
      //创建数组
      char topicArr[topicString.length() + 1];
      //将信息拷贝到数组内
      strcpy(topicArr,topicString.c_str()); // strcpy(目标变量,拷贝源);
    
    /* —----------------------发布内容编辑操作------------------------ */
      static int value = 0; //一定要设成静态的
      String messageString = "PUBLIC-messageame-" + String(value++);
    
      //转换成char数组类型(mqttClient.public传参需要)
      //创建数组
      char messageArr[messageString.length() + 1];
      //将信息拷贝到数组内
      strcpy(messageArr,messageString.c_str()); // strcpy(目标变量,拷贝源);
    
    /* —----------------------向主题发布信息------------------------ */
      if(mqttClient.publish(topicArr,messageArr)) //mqttClient.publish(主题名数组,内容数组)
      {
        Serial.print("Topic:");Serial.println(topicArr);
        Serial.print("Message:");Serial.println(messageArr);
      }
      else
      {
        Serial.println("Public Failed!");
      }
    }
    
    //计时函数
    void timeCounter(){
      count++;
    }
  • 若程序正确,且成功上传开发板之后。可以通过如下操作验证已发布订阅消息:

    1. 打开ArduinoIDE的串口监视器,找到自己发布的主题名称,并复制
      image-20221224231749009
    2. 打开MQTT.fx软件,进入Subscribe页面,输入自己的订阅名称,点击订阅

      image-20221224232455000

    3. 如果这里出现不断滚动的消息的话。那么恭喜你,实验成功!
开发板订阅主题

程序

/*
# 程序目的:实现ESP8266订阅 然也 物联网服务器的主题。服务端发送1,灯亮;发送0,灯灭
# 创建时间:2022-12-26
# 函数:
## wifi对象: WiFi.SSID();WiFi.localIP(); WiFi.macAddress(); WiFi.mode(WIFI_STA);
## mqttClient对象:mqttClient.setServer(网站, 端口号); mqttClient.connected(); mqttClient.loop(); mqttClient.connect(订阅者的ID.c_str()); mqttClient.state();mqttClient.subscribe(主题名数组); mqttClient.setCallback(回调函数名称)
## Ticker对象:ticker.attach(即使周期(s),执行函数名);
# 程序思路:串口初始化->配置引脚高低电平 -> 连接wifi->设置MQTT服务器和端口号->重写接收回调函数 -> 连接MQTT服务器(自建函数)->开启定时->loop函数中检查是否连接->订阅信息->保持客户端心跳
# 难点:变量类型间的转换、回调函数的书写
# 注意:需要把WiFi_Connect()函数中的WiFi名称和密码换成自己的。否则连接不上
*/    
#include <ESP8266WiFi.h>
#include <ESP8266WiFiMulti.h>
#include <PubSubClient.h>
#include <Ticker.h>

//创建对象
ESP8266WiFiMulti My_WifiMulti;
WiFiClient My_WifiClient;
PubSubClient mqttClient(My_WifiClient);
Ticker My_ticker;

//定义常量
const char* mqttServerSite = "test.ranye-iot.net";

//定义变量
unsigned int worktime_s;

//函数申明
int WiFi_Connect();
void My_connectMQTTServer();
void My_receiveCallback(char* topic, uint8_t* payload, unsigned int length);//注意:回调函数的参数需参考PubSubClient.头文件
void My_subscribeTopic();
void calcTheWorkTime();

void setup()
{
  //串口
  Serial.begin(9600);

  //引脚配置
  pinMode(LED_BUILTIN,OUTPUT); // 设置板上LED引脚为输出模式
  digitalWrite(LED_BUILTIN, HIGH); // 启动后关闭板上LED

  //连接WiFi
  WiFi_Connect();

  //记录工作时间
  My_ticker.attach(1,calcTheWorkTime);

  //建立服务
  mqttClient.setServer(mqttServerSite,1883);

  //重写回调函数(注意:回调函数的重写要在连接之前)
  mqttClient.setCallback(My_receiveCallback);

  //连接mqtt服务器
  My_connectMQTTServer();

  //订阅指定主题
  My_subscribeTopic();

}

void loop()
{
  if (mqttClient.connected()) // 连接成功
  {
    mqttClient.loop(); //保持客户端心跳
  }
  else //失败——重新尝试连接
  {
    My_connectMQTTServer();
  }
}

/*
wifi连接函数
需引 ESP8266WiFiMulti.h 库  并建立ESP8266WiFiMulti对象
*/
int WiFi_Connect() {
  My_WifiMulti.addAP("TPLINK2.4G", "@@@@@@@@");  // Wifi1
  My_WifiMulti.addAP("username2", "password");   // Wifi2
  My_WifiMulti.addAP("username3", "password");   // Wifi3

  int i = 0;
  Serial.print("\n-------------Connected Time:-------------\n");
  while (My_WifiMulti.run() != WL_CONNECTED) {
    i += 1;
    Serial.print(i);
    Serial.print("->");
    delay(1000);
    if (i > 15) {
      Serial.print("\n-------------WIFI connected failed!-------------\n");
      return 0;
    }
  }

  Serial.println("\n-------------WIFI connected successful!-------------\n");
  Serial.println("\n-------------WIFI Name:-------------");
  Serial.println(WiFi.SSID());
  Serial.println("\n-------------ESP8266 IP address:-------------");
  Serial.println(WiFi.localIP());
  Serial.println("\n-------------------------------------------");
  return 1;
}


/*
客户端连接服务器端函数
注意:需要提前建立mqttClient对象
*/
void My_connectMQTTServer()
{
  // 根据ESP8266的MAC地址生成客户端ID(避免与其它ESP8266的客户端ID重名)
  String clientId = "ESP8266_clientId_" + String(WiFi.macAddress()); //注意:这里的String.c_str是对字符串的处理
  //连接MQTT服务器
  if (mqttClient.connect(clientId.c_str()))
  {
    Serial.print("Connect to "); Serial.print(mqttServerSite); Serial.println(" successful!");
    Serial.print("clientId = ");Serial.println(clientId);
    Serial.print("Server status: ");Serial.println(mqttClient.state());
  }
  else
  {
    Serial.print("Connect to "); Serial.print(mqttServerSite); Serial.println(" failed!");
    Serial.print("Server status: ");Serial.println(mqttClient.state());
    delay(3000);
  }
}


/*
重写收到订阅消息的回调函数(难点)
注意:参数需参考PubSubClient.头文件  而且即使有多个主题,也只能有一个回调函数
*/
void My_receiveCallback(char* topic, uint8_t* payload, unsigned int length) //((char*)主题,()有效载荷,(uint)长度)
{
  static int number = 0;
  Serial.println(" ");
  Serial.print("This is the ["); Serial.print(number++); Serial.println("] message"); 
  Serial.print("WorkTime: ["); Serial.print(worktime_s); Serial.println("s]");

  /* —----------------------打印主题名------------------------ */
  Serial.print("Message is from the topic of: [");
  Serial.print(topic);
  Serial.println("] ");

/* —----------------------打印主题长度------------------------ */
  Serial.print("Message Length: ["); Serial.print(length); Serial.println("(Bytes) ] ");

/* —----------------------内容及逻辑控制------------------------ */
  Serial.print("Payload: [");
  for (int i = 0; i < length; i++ )
  {
    Serial.print((char)payload[i]); //强制类型转换语法:(类型)表达式
  }
  Serial.println("] ");

  if( (char)payload[0] == '1')
  {
    digitalWrite(LED_BUILTIN,LOW);
  }
  else
  {
    digitalWrite(LED_BUILTIN,HIGH);
  }
}

/*
主题订阅函数
注意:与主题发布函数差不多
*/
void My_subscribeTopic()
{
/* —----------------------第一个主题变量加工------------------------ */
  //订阅主题的名称 字符串型
  String topicString_1 = "WuKaiXiong's topic_1";

  //转换成char数组类型(mqttClient.public传参需要)
  //创建数组
  char topicArr_1[topicString_1.length() + 1];
  //将信息拷贝到数组内
  strcpy(topicArr_1,topicString_1.c_str());// strcpy(目标变量,拷贝源);

/* —----------------------第二个主题变量加工------------------------ */
  //订阅主题的名称 字符串型
  String topicString_2 = "WuKaiXiong_topic_2";

  //转换成char数组类型(mqttClient.public传参需要)
  //创建数组
  char topicArr_2[topicString_2.length() + 1];
  //将信息拷贝到数组内
  strcpy(topicArr_2,topicString_2.c_str());// strcpy(目标变量,拷贝源);

/* —----------------------向服务器发布订阅请求------------------------ */
  if(mqttClient.subscribe(topicArr_1) && mqttClient.subscribe(topicArr_2))
  {
    Serial.print("topic: "); Serial.print(topicArr_1); Serial.print(" and "); Serial.print(topicArr_2); Serial.println(" subscribe success!");
  }
  else 
  {
    Serial.print("topic: "); Serial.print(topicArr_1); Serial.print(" and "); Serial.print(topicArr_2); Serial.println("failed!...");
  }
}

void calcTheWorkTime()
{
  worktime_s ++;
}

若程序正确,且成功上传开发板之后。可以通过如下操作验证已发布订阅消息:

  1. 打开ArduinoIDE的串口监视器,找到自己订阅的主题名称,并复制
    image-20221227195927750
  2. 打开MQTT.fx软件,进入publish页面,输入自己的订阅名称,点击发布
    注意:这一步粘贴过去之后,主题名后面会自动带一个空格,务必把它删掉(踩坑半小时)

    image-20221227200109069

  3. 如果输入1,开发板灯亮的话。那么恭喜你,实验成功!

    image-20221227200319069
    串口输出内容

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件
RabbitMQ消息模型之发布订阅Publish-Subscribe
RabbitMQ消息模型之发布订阅Publish-Subscribe
77 0
RabbitMQ消息模型之发布订阅Publish-Subscribe
|
数据采集 传感器 存储
ESP32+MQTT+MySQL实现发布订阅【气味数据收集】
ESP32+MQTT+MySQL实现发布订阅【气味数据收集】
ESP32+MQTT+MySQL实现发布订阅【气味数据收集】
|
存储 网络协议 物联网
NB-IoT 通信之 MQTT 发布订阅 | 学习笔记
快速学习 NB-IoT 通信之 MQTT 发布订阅
788 0
NB-IoT 通信之 MQTT 发布订阅 | 学习笔记
|
网络协议 安全 物联网
MQTT- 基于 mosquitto 开源 SDK 实现发布订阅 | 学习笔记
快速学习 MQTT- 基于 mosquitto 开源 SDK 实现发布订阅
611 0
MQTT- 基于 mosquitto 开源 SDK 实现发布订阅 | 学习笔记
|
消息中间件 Java PHP
RabbitMQ发布订阅实战-实现延时重试队列
本文将会讲解如何使用RabbitMQ实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。
2681 0
|
消息中间件 C# 存储
[译]RabbitMQ教程C#版 - 发布订阅
先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们。
1202 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
5天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
29 4
|
9天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
43 4