MQTT简介

简介: 早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明了MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议。

早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明了MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议。

通过MQTT协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。

此外,国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。其中Sohu,Cmstop手机客户端中均有使用到MQTT作为消息推送消息。随着移动互联网的发展,MQTT由于开放源代码,耗电量小等特点,将会在移动消息推送领域会有更多的贡献,在物联网领域,传感器与服务器的通信,信息的收集,MQTT都可以作为考虑的方案之一。在未来MQTT会进入到我们生活的各各方面。

如果需要下载MQTT服务器端,可以直接去MQTT官方网站点击software进行下载MQTT协议衍生出来的各个不同版本。

mqtt-and-iot

MQTT特点

MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
  2. 对负载内容屏蔽的消息传输;
  3. 使用 TCP/IP 提供网络连接;
  4. 有三种消息发布服务质量:
    • “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    • “至少一次”,确保消息到达,但消息重复可能会发生。
    • “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

消息与主题 Topic

MQTT发布者与订阅者之间通过”主题”(Topic)进行消息推送,主题(Topic)格式类似URL或文件路径,通过”/“分隔层级,
例如:

sensor/1/temperature
chat/room/subject
presence/user/feng
sensor/1/#
sensor/+/temperature
uber/drivers/joe/inbox

MQTT主题(Topic)支持’+’, ‘#’的通配符,’+’通配一个层级,’#’通配多个层级(必须在末尾)。
MQTT消息发布者(Publisher)只能向特定’名称主题’(不支持通配符)发布消息,订阅者(Subscriber)
通过订阅’过滤主题’(支持通配符)来匹配消息。

注意:必须要转义’+’, ‘#’。

MQTT支持推送消息的QoS(服务质量)。在MQTT 中有三个等级的 QoS:

  • QoS 0: 该等级表示“最多一次”交付(最佳状况)。 消息不会得到确认,因而,这是一种一劳永逸的方法。
  • QoS 1: 该等级表示“至少一次”交付。 用户可能不止一次获得消息,但是允许收到的人确认已经收到。
  • QoS 2: 该等级表示“刚好一次”交付。最慢但是最有保障的服务质量等级。确保用户只收到一次消息,
    并包含四个阶段的交付握手。该等级最慢,但是最安全。

MQTT遗愿消息(Last Will)

MQTT客户端向服务器端CONNECT请求时,可以设置是否发送遗愿消息(Will Message)标志,和遗愿消息主题(Topic)与内容(Payload)。

MQTT客户端异常下线时(客户端断开前未向服务器发送DISCONNECT消息),MQTT消息服务器会发布遗愿消息。

MQTT保留消息(Retained Message)

MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。

保留消息(Retained Message)有两种清除方式:

  • 客户端向有保留消息的主题发布一个空消息。
  • 消息服务器设置保留消息的超期时间。

共享订阅(Shared Subscription)

共享订阅是非标准MQTT约定,不过大多数开源MQTT服务器已经实现。
共享订阅(Shared Subscription)支持在多订阅者间采用分组负载平衡方式派发消息:
也就是订阅相同共享订阅(Shared Subscription)的客户根据负载平衡方式只有1人收到。
这个场景可以用于:集群执行分布式任务(1个人接了活,其他人就不要接了)。

                            ---------
                            |       | --Msg1--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->|  EMQ  | --Msg2--> Subscriber2
                            |       | --Msg3--> Subscriber3
                            ---------

共享订阅支持两种使用方式:

订阅前缀 使用示例
$queue/ mosquitto_sub -t ‘$queue/topic’
$share/<group>/ mosquitto_sub -t ‘$share/group/topic’

本地订阅(Local Subscription)

本地订阅也是非标准MQTT约定。

本地订阅(Local Subscription)只在本节点创建订阅与路由表,不会在集群节点间广播全局路由,非常适合物联网数据采集应用:

mosquitto_sub -t '$local/topic'
mosquitto_pub -t 'topic'

使用方式: 订阅者在主题(Topic)前增加’$local/’前缀。

MQTT会话(Clean Session)

MQTT支持Session(会话),当MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。

‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动清除销毁。
MQTT 方法定义很简单:

  • 连接 - 建立与 MQTT 经纪人(Broker)之间的连接。
  • 断开连接 - 断开与 MQTT 经纪人(Broker)之间的连接。
  • 发布 - 在 MQTT 经纪人(Broker)上发布主题。
  • 订阅 - 从 MQTT 经纪人(Broker)上订阅主题。
  • 退订 - 从 MQTT 经纪人(Broker)上退订主题。

MQTT 服务

要尝试使用MQTT那么首先是需要一台MQTT服务器,两个途径:

  1. 自建MQTT服务,可以在自己的Web服务器上部署一个Eclipse Mosquitto™。我在国内找到一个叫EMQ的服务器貌似讲得很强大也不妨下载试试。再或者去GitHub搜索一下也会找到一大堆的MQTT服务器的源代码。
  2. 使用现成的MQTT服务,我个人比较推荐采取此种方式,入手快而且省不少时间商用时也只是给点钱就完事了。国内在百度云的IoT接入产品中就有。如果只是想测试一下MQTT的话推荐使用 Eclipse 的 MQTT 沙盒。

ESP8266 的MQTT客户端实现

在开始之前你要先在Arduino的库管理中先安装Adafruit MQTT Library

Adafruit MQTT Library

当然不一定非要使用这个库,包括亚马逊和微软等的大公司都提供了一些现成的库直接访问他们的MQTT云服务。

#include <ESP8266WiFi.h>
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"

/************************* WiFi Access Point *********************************/

#define WLAN_SSID       "...your SSID..."
#define WLAN_PASS       "...your password..."

/************************* Adafruit.io Setup *********************************/

#define AIO_SERVER      "io.adafruit.com"
#define AIO_SERVERPORT  1883                   // use 8883 for SSL
#define AIO_USERNAME    "...your AIO username (see https://accounts.adafruit.com)..."
#define AIO_KEY         "...your AIO key..."

/************ Global State (you don't need to change this!) ******************/

// Create an ESP8266 WiFiClient class to connect to the MQTT server.
WiFiClient client;
// or... use WiFiFlientSecure for SSL
//WiFiClientSecure client;

// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, AIO_SERVER, AIO_SERVERPORT, AIO_USERNAME, AIO_KEY);

/****************************** Feeds ***************************************/

// Setup a feed called 'photocell' for publishing.
// Notice MQTT paths for AIO follow the form: <username>/feeds/<feedname>
Adafruit_MQTT_Publish photocell = Adafruit_MQTT_Publish(&mqtt, AIO_USERNAME "/feeds/photocell");

// Setup a feed called 'onoff' for subscribing to changes.
Adafruit_MQTT_Subscribe onoffbutton = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/onoff");

/*************************** Sketch Code ************************************/

// Bug workaround for Arduino 1.6.6, it seems to need a function declaration
// for some reason (only affects ESP8266, likely an arduino-builder bug).
void MQTT_connect();

void setup() {
  Serial.begin(115200);
  delay(10);

  Serial.println(F("Adafruit MQTT demo"));

  // Connect to WiFi access point.
  Serial.println(); Serial.println();
  Serial.print("Connecting to ");
  Serial.println(WLAN_SSID);

  WiFi.begin(WLAN_SSID, WLAN_PASS);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println();

  Serial.println("WiFi connected");
  Serial.println("IP address: "); Serial.println(WiFi.localIP());

  // Setup MQTT subscription for onoff feed.
  mqtt.subscribe(&onoffbutton);
}

uint32_t x=0;

void loop() {
  // Ensure the connection to the MQTT server is alive (this will make the first
  // connection and automatically reconnect when disconnected).  See the MQTT_connect
  // function definition further below.
  MQTT_connect();

  // this is our 'wait for incoming subscription packets' busy subloop
  // try to spend your time here

  Adafruit_MQTT_Subscribe *subscription;
  while ((subscription = mqtt.readSubscription(5000))) {
    if (subscription == &onoffbutton) {
      Serial.print(F("Got: "));
      Serial.println((char *)onoffbutton.lastread);
    }
  }

  // Now we can publish stuff!
  Serial.print(F("\nSending photocell val "));
  Serial.print(x);
  Serial.print("...");
  if (! photocell.publish(x++)) {
    Serial.println(F("Failed"));
  } else {
    Serial.println(F("OK!"));
  }

  // ping the server to keep the mqtt connection alive
  // NOT required if you are publishing once every KEEPALIVE seconds
  /*
  if(! mqtt.ping()) {
    mqtt.disconnect();
  }
  */
}

// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
  int8_t ret;

  // Stop if already connected.
  if (mqtt.connected()) {
    return;
  }

  Serial.print("Connecting to MQTT... ");

  uint8_t retries = 3;
  while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
       Serial.println(mqtt.connectErrorString(ret));
       Serial.println("Retrying MQTT connection in 5 seconds...");
       mqtt.disconnect();
       delay(5000);  // wait 5 seconds
       retries--;
       if (retries == 0) {
         // basically die and wait for WDT to reset me
         while (1);
       }
  }
  Serial.println("MQTT Connected!");
}

接下来的这个示例是订阅MQTT上的指定信道上的新消息并执行ESP8266内的回调函数:

#include <ESP8266WiFi.h>
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"

/************************* WiFi Access Point *********************************/

#define WLAN_SSID       "network"
#define WLAN_PASS       "password"

/************************* Adafruit.io Setup *********************************/

#define AIO_SERVER      "io.adafruit.com"
#define AIO_SERVERPORT  1883
#define AIO_USERNAME    "user"
#define AIO_KEY         "key"

/************ Global State (you don't need to change this!) ******************/

// Create an ESP8266 WiFiClient class to connect to the MQTT server.
WiFiClient client;

// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, AIO_SERVER, AIO_SERVERPORT, AIO_USERNAME, AIO_USERNAME, AIO_KEY);

/****************************** Feeds ***************************************/

// Setup a feed called 'time' for subscribing to current time
Adafruit_MQTT_Subscribe timefeed = Adafruit_MQTT_Subscribe(&mqtt, "time/seconds");

// Setup a feed called 'slider' for subscribing to changes on the slider
Adafruit_MQTT_Subscribe slider = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/slider", MQTT_QOS_1);

// Setup a feed called 'onoff' for subscribing to changes to the button
Adafruit_MQTT_Subscribe onoffbutton = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/onoff", MQTT_QOS_1);

/*************************** Sketch Code ************************************/

int sec;
int min;
int hour;

int timeZone = -4; // utc-4 eastern daylight time (nyc)

void timecallback(uint32_t current) {

  // adjust to local time zone
  current += (timeZone * 60 * 60);

  // calculate current time
  sec = current % 60;
  current /= 60;
  min = current % 60;
  current /= 60;
  hour = current % 24;

  // print hour
  if(hour == 0 || hour == 12)
    Serial.print("12");
  if(hour < 12)
    Serial.print(hour);
  else
    Serial.print(hour - 12);

  // print mins
  Serial.print(":");
  if(min < 10) Serial.print("0");
  Serial.print(min);

  // print seconds
  Serial.print(":");
  if(sec < 10) Serial.print("0");
  Serial.print(sec);

  if(hour < 12)
    Serial.println(" am");
  else
    Serial.println(" pm");

}

void slidercallback(double x) {
  Serial.print("Hey we're in a slider callback, the slider value is: ");
  Serial.println(x);
}

void onoffcallback(char *data, uint16_t len) {
  Serial.print("Hey we're in a onoff callback, the button value is: ");
  Serial.println(data);
}


void setup() {
  Serial.begin(115200);
  delay(10);

  Serial.println(F("Adafruit MQTT demo"));

  // Connect to WiFi access point.
  Serial.println(); Serial.println();
  Serial.print("Connecting to ");
  Serial.println(WLAN_SSID);

  WiFi.begin(WLAN_SSID, WLAN_PASS);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println();

  Serial.println("WiFi connected");
  Serial.println("IP address: "); Serial.println(WiFi.localIP());

  timefeed.setCallback(timecallback);
  slider.setCallback(slidercallback);
  onoffbutton.setCallback(onoffcallback);
  
  // Setup MQTT subscription for time feed.
  mqtt.subscribe(&timefeed);
  mqtt.subscribe(&slider);
  mqtt.subscribe(&onoffbutton);

}

uint32_t x=0;

void loop() {
  // Ensure the connection to the MQTT server is alive (this will make the first
  // connection and automatically reconnect when disconnected).  See the MQTT_connect
  // function definition further below.
  MQTT_connect();

  // this is our 'wait for incoming subscription packets and callback em' busy subloop
  // try to spend your time here:
  mqtt.processPackets(10000);
  
  // ping the server to keep the mqtt connection alive
  // NOT required if you are publishing once every KEEPALIVE seconds
  
  if(! mqtt.ping()) {
    mqtt.disconnect();
  }
}

// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
  int8_t ret;

  // Stop if already connected.
  if (mqtt.connected()) {
    return;
  }

  Serial.print("Connecting to MQTT... ");

  uint8_t retries = 3;
  while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
       Serial.println(mqtt.connectErrorString(ret));
       Serial.println("Retrying MQTT connection in 10 seconds...");
       mqtt.disconnect();
       delay(10000);  // wait 10 seconds
       retries--;
       if (retries == 0) {
         // basically die and wait for WDT to reset me
         while (1);
       }
  }
  Serial.println("MQTT Connected!");
}

附: MQTT协议中文版

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
3月前
|
消息中间件 安全 物联网
RabbitMQ的人生简介
8月更文挑战第26天
|
6月前
|
消息中间件 Cloud Native 自动驾驶
RocketMQ实战教程之MQ简介
Apache RocketMQ 是一个云原生的消息流平台,支持消息、事件和流处理,适用于云边端一体化场景。官网提供详细文档和下载资源:[RocketMQ官网](https://rocketmq.apache.org/zh/)。示例中提到了RocketMQ在物联网(如小米台灯)和自动驾驶等领域的应用。要开始使用,可从[下载页面](https://rocketmq.apache.org/zh/download)获取软件。
|
消息中间件 Linux 虚拟化
消息中间件系列教程(04) -RabbitMQ -简介&安装
消息中间件系列教程(04) -RabbitMQ -简介&安装
69 0
|
6月前
|
监控 安全 物联网
阿里云mqtt简介和使用流程
本文介绍了阿里云MQTT的准备工作、简介和使用流程。首先,用户需要注册阿里云账号并完成实名认证。接着,通过阿里云物联网平台创建产品和设备,获取连接所需的Broker Address、Port、Username和Password。然后,使用MQTT客户端(如MQTTX)配置这些信息进行连接,并激活设备。最后,创建并订阅/发布自定义Topic,实现设备间的通信。阿里云MQTT是一个适用于物联网设备的轻量级通信协议,提供高并发、高可靠性的服务,广泛应用于各种物联网场景。
阿里云mqtt简介和使用流程
|
6月前
|
消息中间件 传感器 网络协议
阿里云MQTT简介和使用流程
以下是内容的摘要: 该文主要介绍了在阿里云上搭建 MQTT 服务器的步骤。首先,需要注册阿里云账号并进行实名认证。然后,购买阿里云 MQTT 实例,选择合适的类型、地域、连接和消息限制。接着,创建产品和设备,命名并上线,获取 MQTT 连接的相关信息,包括 ProductKey、DeviceName 和 DeviceSecret。通过提供的 MQTT.fx 工具,设置 MQTT 客户端连接参数,包括 Broker 地址、端口、用户名和密码。最后,使用 MQTT.fx 测试连接,实现数据的上报和接收,验证 MQTT 服务器的配置是否成功。
|
6月前
|
消息中间件 存储 中间件
RabbitMq简介
RabbitMq简介
|
传感器 负载均衡 网络协议
01 MQTT简介
01 MQTT简介
76 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
120 0
|
存储 安全 数据可视化
阿里云mqtt简介和优惠购买流程
MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,它可以在不同的设备和系统之间传递信息。阿里云是中国市场主流的云计算服务提供商,它提供了MQTT服务来支持IoT(Internet of Things)设备的通信。