从零开始搭建物联网平台(四)EMQ-X消息中间件

简介: 从零开始搭建物联网平台(四)EMQ-X消息中间件

      物联网的消息中间件有很多,如ActiveMq、RabbitMq、Emq、以及自己实现的netty-borker,这里为什么要选择EMQ呢,首先,在使用emqx之前我用过ActiveMq由于是国外开发的,对国内产品的支持不够好,文档和社区也远没有EmqX那样详细与活跃。当然我也考虑过自己实现,但是出去效率性,就暂时先选择EMQX作为我的消息中间件。而且,EMQX丰富的中文文档、和完善的功能几乎能满足我们90%的需求,而且支持多种协议MQTT、MQTT-SN、CoAP、LwM2M、WebSocket。对于设备连接量,EMQX免费版支持50万的设备连接量。企业版可以使用集群进行扩充连接量。话不多说,我们开始搭建我们的EMQX吧!


传送门 EMQX网站https://docs.emqx.io/broker/latest/cn/


一、简介

  • 1883:MQTT 协议端口
  • 8883:MQTT/SSL 端口
  • 8083:MQTT/WebSocket 端口
  • 8080:HTTP API 端口
  • 18083:Dashboard 管理控制台端口


EMQX管理页面为http://127.0.0.1:18083。把IP替换成你的服务器IP,默认用户名密码为admin/public

#启动服务

systemctl start emqttd

#停止服务

systemctl stop emqttd

#重启服务

systemctl restart emqttd


二、安装EMQX

1. Docker安装

docker pull emqx/emqx


docker run -d --name emqx --privileged=true --restart=always -p 18083:18083 -p 1883:1883 -p 8085:8085 -p 8083:8083 -p 8084:8084 -p 8883:8883 -v /web/EMQX/connf/emqx_auth_http.conf:/opt/emqx/etc/plugins/emqx_auth_http.conf emqx/emqx:latest

2.Linnx安装

sudo yum install emqx-4.0.0

三、HTTP认证

1. 关闭匿名认证

进入emqx安装目录下/etc,打开emqx.conf文件,搜索anonymous,找到这一行allow_anonymous = true,把true改为false。

2. 配置HTTP认证插件

打开EMQ的配置文件

1. cd /etc/emqx/plugins/
2. vim emqx_auth_http.conf


修改认证的接口为服务端提供的认证接口,这样设备连接EMQ的时候会先走我们的服务端去校验用户名、密码、客户端ID是否合法。

-##--------------------------------------------------------------------
## HTTP Auth/ACL Plugin
##--------------------------------------------------------------------
##--------------------------------------------------------------------
## Authentication request.
##
## Variables:
##  - %u: username
##  - %c: clientid
##  - %a: ipaddress
##  - %P: password
##
## Value: URL
auth.http.auth_req = http://172.95.177.11:8899/scc/mqtt/auth
## Value: post | get | put
auth.http.auth_req.method = post
## Value: Params
auth.http.auth_req.params = clientid=%c,username=%u,password=%P
##--------------------------------------------------------------------
## Superuser request.
##
## Variables:
##  - %u: username
##  - %c: clientid
##  - %a: ipaddress
##
## Value: URL
auth.http.super_req = http://172.95.177.11:8899/scc/mqtt/superuser
## Value: post | get | put
auth.http.super_req.method = post
## Value: Params
auth.http.super_req.params = clientid=%c,username=%u
##--------------------------------------------------------------------
## ACL request.
##
## Variables:
##  - %A: 1 | 2, 1 = sub, 2 = pub
##  - %u: username
##  - %c: clientid
##  - %a: ipaddress
##  - %t: topic
##
## Value: URL
auth.http.acl_req = http://172.95.177.11:8899/scc/mqtt/acl
## Value: post | get | put
auth.http.acl_req.method = get
## Value: Params
auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

3.开启插件,重启EMQX

打开http://127.0.0.1:18083/EMQ的网页管理页面,找到http认证的插件,开启它

systemctl restart emqttd重启EMQ

4.服务端Springboot接口

@ApiOperation(value = "客户端连接授权" ,notes = "客户端连接授权" )
  @ApiImplicitParams({
      @ApiImplicitParam(name = "clientid" ,value = "客户端clientId" , required = false, dataType = "String"),
      @ApiImplicitParam(name = "username" ,value = "客户端username" , required = false, dataType = "String"),
      @ApiImplicitParam(name = "password" ,value = "客户端password" , required = false, dataType = "String")
  })
  @RequestMapping(value = "/auth", method = RequestMethod.POST)
  public void checkUser(String clientid, String username, String password, HttpServletResponse response) {
    logger.info("普通用户;clientid:" + clientid + ";username:" + username + ";password:" + password);
    //计算用户的剩余设备
    if(productService.countDevNumByPid(username)<=0){
      response.setStatus(402);
      return;
    }
    if (!"".equals(clientid) && productService.findByIdAndToken(username, password).size()>0&&productService.getDevStatusBySn(clientid)!=1) {
      System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"允许通过");
      response.setStatus(200);
    } else {
      System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"禁止通过");
      response.setStatus(401);
    }
  }
  @RequestMapping("/superuser")
  public void mqttSuperuser(String clientid, String username, HttpServletResponse response) {
    //auth.http.super_req.params = clientid=%c,username=%u
    if(clientid.startsWith("server_client_")|| clientid.startsWith("web_client_")||clientid.startsWith("wxapp_client_")){
      response.setStatus(200);
      return;
    }
    logger.info("超级用户;clientid:" + clientid + ";username:" + username);
    System.out.println("超级用户;clientid:" + clientid + ";username:" + username);
    response.setStatus(200);
  }
  @RequestMapping("/acl")
  public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) {
    //auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
    logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
    System.out.println("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
    response.setStatus(200);
  }

四、上线下线接口

如何做到设备上线下线实时显示呢,这里我们可以接用EMQ提供的系统主题即$SYS/,在我们的 服务端订阅这个主题,当设备连接到EMQ上时,EMQ服务器就会发送一条系统主题如:


$SYS/brokers/emqx@127.0.0.1/clients/sn0054556/disconnected


$SYS/brokers/emqx@127.0.0.1/clients/sn0054556/connected


我们通过订阅系统主题就可以实现实时获得设备状态的效果。


找到EMQ安装位置下的acl.conf文件。vi这个文件。替换为下边的配置方式


允许客户端订阅"SYS/brokers/+/clients/#"主题,但是禁止订阅"SYS/brokers/+/clients/#"主题,但是禁止订阅"SYS/brokers/+/clients/#"主题,但是禁止订阅"SYS/#"主题。这样系统的安全性会提升

{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.

五、HTTP API

这里为什么要调用EMQ的HTTPAPI呢,主要是为了使服务端和Broker分离,各司其职。我的服务端不再存储任何设备状态信息。关于设备的缓存,在线状态全部交由EMQ来管控。利用API我们可以随时知道当前EMQ连接了那些设备。设备的详细信息。因为我们订阅上线、下线消息只能满足与用户刚好登录了页面。在连接上EMQ的这段时间内所得到的设备上线下线情况。我这里是前端单独是MQTT客户端、服务端也是一个MQTT客户端。所以通过服务端调用HTTPAPI可以随时获取当前设备在线情况,而不是自己在服务端自己维护一套设备的session。


在服务端service层添加如下代码,然后调用这个service即可调用EMQ的各种API了,非常好用。


如获得在线设备/api/v4/clients具体参考https://docs.emqx.io/broker/latest/cn/advanced/http-api.html#endpoint-nodes


这里还遇到一个坑。就是数据流读取的问题,在服务器上读取的流经常会断掉,导致json数据异常。而本地运行完全没有这个问题,后来经师傅提点才改进了流读取方式,下边的是非常完好的流读取方式。

//用户名
  private static String username = "admin";
  //登录密码
  private static String password = "public";
  //服务器地址
  private static String serverPath = "http://127.9.0.1:18083";
  //当前页
  private static int pageIndex = 1;
  //页大小
  private static int pageSize = 100;
  @Override
  public  String query(String queryPathUrl, int pageIndex, int pageSize) throws Exception {
    //拼接查询参数
    if(pageIndex>0&&pageSize>0){
      queryPathUrl = queryPathUrl +"?" + "_page=" + pageIndex + "&" + "_limit=" + pageSize;
    }
    URL url = new URL(serverPath+queryPathUrl);
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    String authorization = getBase64(username, password);
    //连接认证信息放在头里,注意,base64可以反编码,有安全隐患
    conn.setRequestProperty("authorization", "Basic "+authorization);
    conn.setRequestMethod("GET");
    // 开始连接
    conn.connect();
    String resule = null ;
    if (conn.getResponseCode() == 200) {
      // 请求返回的数据
      InputStream inputStream = conn.getInputStream();
      byte[] readBuffer = new byte[1024];
      int numBytes = -1;
      ByteArrayOutputStream resultB = new ByteArrayOutputStream();
//      try {
      while (inputStream.available() > 0) {
        numBytes = inputStream.read(readBuffer);
        if (numBytes >= 0) {
          resultB.write(readBuffer, 0, numBytes);
          readBuffer = new byte[1024];
          Thread.sleep(500);
        }
      }
      resule = new String(resultB.toByteArray(), "UTF-8");
      inputStream.close();
    }
    return resule;
  }

六、调优

关于调优部分,需要设置许多liunx的参数,改配置文件。这里就不介绍了,因为随着版本的升级,官网都会实时更新。官网的步骤也是极其详细。

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
消息中间件 网络协议 算法
亿级万物互联新时代的物联网消息中间件 EMQX 调研
我们身边越来越多的硬件设备正在被嵌入芯片、注入软件,从而实现各种各样的新应用、新功能,比如智能门锁,智能音箱等,前几年炒的火热的智能家居,物联网万物互联等概念,现在正在潜移默化的影响着所有人,了解一些物联网知识对我们了解这个新时代有所帮助。
647 30
亿级万物互联新时代的物联网消息中间件 EMQX 调研
|
消息中间件 负载均衡 物联网
在Linux服务器上安装EMQX平台:构建高性能的开源物联网消息中间件
EMQX是一个开源的物联网消息中间件平台,提供高性能、高可用性的MQTT和CoAP协议支持,适用于大规模物联网应用场景。本文将详细介绍在Linux服务器上安装EMQ X平台的步骤,帮助开发者快速搭建功能强大的物联网消息中间件。
3567 1
|
消息中间件 Kubernetes 网络协议
亿级万物互联新时代的物联网消息中间件EMQX调研
EMQ 创始人兼 CEO 李枫表示:「EMQX 5.0 是 MQTT 领域的一个里程碑式的成果。它不仅是全球首个单集群支持 1 亿连接的分布式 MQTT 消息服务器,也是首个将 QUIC 引入 MQTT 的开创性产品。
324 7
亿级万物互联新时代的物联网消息中间件EMQX调研
|
物联网 数据库 Windows
【物联网中间件平台-02】YFIOs技术白皮书(V1.1)
在工控领域,组态软件司空见惯,国外的iFix、InTouch、WinCC,国内的组态王、力控、MSCG等等。组态软件的出现彻底解决了软件重复开发的问题,实现模块级复用,好处不仅仅是提高了开发效率,降低了开发周期,更大的优势的是成熟模块的复用,大大提高了系统稳定性和可靠性。
1345 12
|
开发工具 物联网
【物联网中间件平台-03】YFIOs安装指南
YFIOs就是YFSoft I/OServer的简称,在物联网、云计算时代,一切以数据为中心,不同的传感器通过不同的方式接入网络,通过云计算的方式为不同的终端用户提供服务。
1012 8
|
物联网
【物联网中间件平台-05】YFIOs策略开发指南
YFIOs就是YFSoft I/OServer的简称,在物联网、云计算时代,一切以数据为中心,不同的传感器通过不同的方式接入网络,通过云计算的方式为不同的终端用户提供服务。
697 5
|
物联网 传感器
【物联网中间件平台-04】YFIOs驱动开发指南
YFIOs就是YFSoft I/OServer的简称,在物联网、云计算时代,一切以数据为中心,不同的传感器通过不同的方式接入网络,通过云计算的方式为不同的终端用户提供服务。
704 5
|
物联网
【物联网中间件平台-06】RFID刷卡拍照
RFID刷卡拍照,如果是传统意义上的开发,我们一般需要先知道是什么型号的RFID刷卡器,自己动手编写一套读写RFID的代码,用来读取RFID标签。摄像头驱动开发也是这样的,先要知道摄像头驱动的协议,然后自己编写读取图像并显示的代码。
911 3