mqtt消息推送

本文涉及的产品
.cn 域名,1个 12个月
密钥管理服务KMS,1000个密钥,100个凭据,1个月
简介: 简介近年来随着 Web 前端的快速发展,浏览器新特性层出不穷,越来越多的应用可以在浏览器端或通过浏览器渲染引擎实现,Web 应用的即时通信方式 WebSocket 得到了广泛的应用。WebSocket 是一种在单个 TCP 连接上进行全双工通讯的协议。

简介

近年来随着 Web 前端的快速发展,浏览器新特性层出不穷,越来越多的应用可以在浏览器端或通过浏览器渲染引擎实现,Web 应用的即时通信方式 WebSocket 得到了广泛的应用。

WebSocket 是一种在单个 TCP 连接上进行全双工通讯的协议。WebSocket 通信协议于2011年被 IETF 定为标准 RFC 6455,并由 RFC 7936 补充规范。WebSocket API 也被 W3C 定为标准。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。 —— 摘自 维基百科 WebSocket

MQTT 协议第 6 章详细约定了 MQTT 在 WebSocket [RFC6455] 连接上传输需要满足的条件,协议内容EMQ君不在此累述。由于协议实现细节较为复杂,本文选取两个常用的 JavaScript MQTT 客户端进行连接测试。

两款客户端比较

Paho.mqtt.js

Paho 是 Eclipse 的一个 MQTT 客户端项目,Paho JavaScript Client 是其中一个基于浏览器的库,它使用 WebSockets 连接到 MQTT 服务器。相较于另一个 JavaScript 连接库来说,其功能较少,不推荐使用。

MQTT.js

MQTT.js 一个 MQTT 协议的客户端库,用 JavaScript 编写,可用于 Node.js 和浏览器。在 Node.js 端可以通过全局安装使用命令行连接,同时还支持 MQTT ,MQTT TLS 证书连接;值得一提的是 MQTT.js 还对微信小程序有较好的支持。

EMQ 君将以 MQTT.js 库进行连接讲解。

安装 MQTT.js

如果读者机器上装有 Node.js 运行环境,可使用 npm 命令安装 MQTT.js

在当前目录安装

npm i mqtt

全局安装

将注册 mqtt mqtt_pub mqtt_sub 命令到当前用户,此处借助 iot.eclipse.org 讲解一下命令行的使用

    # 全局安装
    npm i mqtt -g
     
    # 使用命令行订阅
    $ mqtt sub -t 'hello' -h 'iot.eclipse.org' -v
    > hello 09860
     
    # 成功连接到服务器并订阅了主题 hello, 命令行将阻塞等待消息
     
     
    # 在另一个终端上使用命令行发布
    mqtt pub -t 'hello' -h 'iot.eclipse.org' -m 'from MQTT.js'
     
    # 命令行将进行 连接 -> 发布 -> 断开连接 操作,此时读者会到订阅命令行,应当收到来自 hello 主题的消息
     
    > hello from MQTT.js

npm 在当前目录安装仍然可以使用 ./node_module/.bin/mqtt 命令来执行以上操作。

CDN 引用

MQTT.js 包可以通过 http://unpkg.com 获得

    <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
     
    <script>
        // 将在全局初始化一个 mqtt 变量
        console.log(mqtt)
    </script>

连接至 MQTT 服务器

几个公共的用于 WebSocket 测试连接服务器:

  • test.mosquitto.org - 使用端口 8080 未加密,8081 用于 SSL 上的 WebSocket;

  • iot.eclipse.org - 使用端口 80 未加密,443 用于 SSL 上的 WebSocket;

  • broker.hivemq.com - 使用端口 8000 未加密,不支持 SSL 上的 WebSocket。

由于需要展示客户端认证部分内容,但上述服务器未提供客户端认证服务,笔者特通过 ActorCloud 平台注册了一个设备进行接入连接。

EMQ 使用 8083 端口用于普通连接,8084 用于 SSL 上的 WebSocket 连接。

    // <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
    // const mqtt = require('mqtt')
    import mqtt from 'mqtt'
     
    // 连接选项
    const options = {
          connectTimeout: 4000, // 超时时间
          // 认证信息
          clientId: 'emqx-connect-via-websocket',
          username: 'emqx-connect-via-websocket',
          password: 'emqx-connect-via-websocket',
    }
     
    const client = mqtt.connect('wss://iot.actorcloud.io:8084/mqtt', options)
     
    client.on('reconnect', (error) => {
        console.log('正在重连:', error)
    })
     
    client.on('error', (error) => {
        console.log('连接失败:', error)
    })
     

连接地址

上文示范的连接地址可以拆分为: wss: // iot . actorcloud.io : 8084 /mqtt

即 协议 // 主机名 . 域名 : 端口 / 路径

初学者容易出现以下几个错误:

  • 连接地址没有指明协议:WebSocket 作为一种通信协议,其使用 ws(非加密)、wss(SSL 加密) 作为协议标识。MQTT.js 客户端支持多种协议,连接地址需指明协议类型;
  • 连接地址没有指明端口:MQTT 并未对 WebSocket 接入端口做出规定,EMQ 上默认使用 8083 8084 分别作为非加密连接、加密连接端口。而 WebSocket 协议默认端口同 HTTP 保持一致 (80/443),不填写端口则表明使用 WebSocket 的默认端口连接;而使用标准 MQTT 连接时则无需指定端口,如 MQTT.js 在 Node.js 端可以使用 mqtt://localhost 连接至标准 MQTT 8083 端口,当连接地址是 mqtts://localhost 则连接到 8884 端口;
  • 连接地址无路径:MQTT-WebSoket 统一使用 /path 作为连接路径,连接时需指明;
  • 协议与端口不符:使用了 wss 连接却连接到 8083 端口;
  • 在 HTTPS 下使用非加密的 WebSocket 连接: Google 等机构在推进 HTTPS 的同时也通过浏览器约束进行了安全限定,即 HTTPS 连接下浏览器会自动禁止使用非加密的 ws 协议发起连接请求;
  • 证书与连接地址不符: 篇幅较长,详见下文 EMQ 启用 SSL/TLS 加密连接。

连接选项

上面代码中, options 是客户端连接选项,以下是主要参数说明,其余参数详见https://www.npmjs.com/package/mqtt#connect

  • keepalive:心跳时间,默认 60秒,设置 0 为禁用;
  • clientId: 客户端 ID ,默认通过 'mqttjs_' + Math.random().toString(16).substr(2, 8) 随机生成;
  • username:连接用户名(如果有);
  • password:连接密码(如果有);
  • clean:true,设置为 false 以在离线时接收 QoS 1 和 2 消息;
  • reconnectPeriod:默认 1000 毫秒,两次重新连接之间的间隔,客户端 ID 重复、认证失败等客户端会重新连接;
  • connectTimeout:默认 30 * 1000毫秒,收到 CONNACK 之前等待的时间,即连接超时时间。

订阅/取消订阅

连接成功之后才能订阅,且订阅的主题必须符合 MQTT 订阅主题规则;

注意 JavaScript 异步非阻塞特性,只有在 connect 事件后才能确保客户端已成功连接,或通过 client.connected 判断是否连接成功:

    // 错误示例
    client.on('connect', handleConnect)
    client.subscribe('hello')
    client.publish('hello', 'Hello EMQ')
     
    // 正确示例
     
    client.on('connect', (e) => {
        console.log('成功连接服务器')
        
        // 订阅一个主题
        client.subscribe('hello', { qos: 1 }, (error) => {
            if (!error) {
                cosnole.log('订阅成功')
                client.publish('hello', 'Hello EMQ', { qos: 1, rein: false }, (error) => {
                    cosnole.log(error || '发布成功')
                })
            }
        })
        
        // 订阅多个主题
        client.subscribe(['hello', 'one/two/three/#', '#'], { qos: 1 },  onSubscribeSuccess)
        
        // 订阅不同 qos 的不同主题
        client.subscribe(
            [
                { hello: 1 }, 
                { 'one/two/three': 2 }, 
                { '#': 0 }
            ], 
            onSubscribeSuccess,
        )
    })
     
    // 取消订阅
    client.unubscribe(
        // topic, topic Array, topic Array-Onject
        'hello',
        onUnubscribeSuccess,
    )

发布/接收消息

发布消息到某主题,发布的主题必须符合 MQTT 发布主题规则,否则将断开连接。发布之前无需订阅该主题,但要确保客户端已成功连接:

    // 监听接收消息事件
    client.on('message', (topic, message) => {
        console.log('收到来自', topic, '的消息', message.toString())
    })
     
    // 发布消息
    if (!client.connected) {
        console.log('客户端未连接')
        return
    }
     
    client.publish('hello', 'hello EMQ', (error) => {
        console.log(error || '消息发布成功')
    })

微信小程序

MQTT.js 库对微信小程序特殊处理,使用 wxs 协议标识符。注意小程序开发规范中要求必须使用加密连接,连接地址应类似为wxs://iot.actorcloud.io:8084/mqtt。

EMQ 启用 SSL/TLS 加密连接

EMQ 内置自签名证书,默认已经启动了加密的 WebSocket 连接,但大部分浏览器会报证书无效错误如net::ERR_CERT_COMMON_NAME_INVALID (Chrome、360 等 webkit 内核浏览器在开发者模式下, Console 选项卡 可以查看大部分连接错误)。

准备工作

这篇文章 https流程和原理 中对证书认证进行了详细的阐述,EMQ 君总结启用 SSL/TLS 证书需要具备的条件是:

  • 将域名绑定到 EMQ 服务器公网地址:CA 机构签发的证书签名是针对域名的;
  • 申请证书:向 CA 机构申请所用域名的证书,注意选择一个可靠的 CA 机构且证书要区分泛域名与主机名;
  • 使用加密连接的时候选择 wss 协议,并使用域名连接:绑定域名-证书之后,必须使用域名而非 IP 地址进行连接,这样浏览器才会根据域名去校验证书以在通过校验后建立连接。

在 EMQ 上配置

打开 etc/emqx.conf 配置文件,修改以下配置

    # wss 监听地址
    listener.wss.external = 8084
     
    # 修改密钥文件地址
    listener.wss.external.keyfile = etc/certs/cert.key
     
    # 修改证书文件地址
    listener.wss.external.certfile = etc/certs/cert.pem

重启 EMQ 即可。

可以使用你的证书与密钥文件直接替换到 etc/certs/ 下。

在 nginx 上配置反向代理与证书

使用 nginx 来反向代理并加密 WebSocket 可以减轻 EMQ 服务器计算压力,同时实现域名复用,同时通过 nginx 的负载均衡可以分配多个后端服务实体。

   # 建议 WebSocket 也绑定到 443 端口
   listen 443, 8084;
   server_name example.com;
    
   ssl on;
    
   ssl_certificate /etc/cert.crt;  # 证书路径
   ssl_certificate_key /etc/cert.key; # 密钥路径
    
    
   # upstream 服务器列表
   upstream emq_server {
       server 10.10.1.1:8883 weight=1;
       server 10.10.1.2:8883 weight=1;
       server 10.10.1.3:8883 weight=1;
   }
    
   # 普通网站应用
   location / {
       root www;
       index index.html;
   }
    
   # 反向代理到 EMQ 非加密 WebSocket
   location / {
       proxy_redirect off;
       # upstream
       proxy_pass http://emq_server;
       
       proxy_set_header Host $host;
       # 反向代理保留客户端地址
       proxy_set_header X-Real_IP $remote_addr;
       proxy_set_header X-Forwarded-For $remote_addr:$remote_port;
       # WebSocket 额外请求头
       proxy_http_version 1.1;
       proxy_set_header Upgrade $http_upgrade;
       proxy_set_header Connection “upgrade”;
   }

其他资源

MQTT.js 官方例子给出了详细的连接与使用操作实例代码,读者可前往查看;

EMQ Dashboard 中的 WebSocket 工具、ActorCloud 测试工具 -> MQTT 客户端 (需到 ActorCloud 商城开通),均使用 MQTT.js 构建,读者可体验参考。

作者:EMQ

链接:https://www.jianshu.com/p/4fd95cae1a9c
来源:简书

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
网络协议 算法 网络性能优化
我的mqtt协议和emqttd开源项目个人理解(15) - MQTT消息推送协议应用数据包超时是否需要重发?
我的mqtt协议和emqttd开源项目个人理解(15) - MQTT消息推送协议应用数据包超时是否需要重发?
369 0
|
传感器 机器学习/深度学习 网络协议
MQTT C Client实现消息推送(入门指南)
MQTT C Client实现消息推送(入门指南) MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,通过协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。
8013 0
|
Android开发
采用MQTT协议实现Android消息推送
[原]采用MQTT协议实现Android消息推送 对于消息推送,一开始还真不知道什么方式比较好,一头雾水,现在回顾总结下资料。 http://zheye.org/asks/4d99a1aafd503c41d700000a 通过上面者也里面的回复,得到一些信息。
1749 0
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
106 6
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
97 10
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
2月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
76 4