物联网的消息中间件有很多,如ActiveMq、RabbitMq、Emq、以及自己实现的netty-borker,这里为什么要选择EMQ呢,首先,在使用emqx之前我用过ActiveMq由于是国外开发的,对国内产品的支持不够好,文档和社区也远没有EmqX那样详细与活跃。当然我也考虑过自己实现,但是出去效率性,就暂时先选择EMQX作为我的消息中间件。而且,EMQX丰富的中文文档、和完善的功能几乎能满足我们90%的需求,而且支持多种协议MQTT、MQTT-SN、CoAP、LwM2M、WebSocket。对于设备连接量,EMQX免费版支持50万的设备连接量。企业版可以使用集群进行扩充连接量。话不多说,我们开始搭建我们的EMQX吧!
传送门 EMQX网站https://docs.emqx.io/broker/latest/cn/
一、简介
- 1883:MQTT 协议端口
- 8883:MQTT/SSL 端口
- 8083:MQTT/WebSocket 端口
- 8080:HTTP API 端口
- 18083:Dashboard 管理控制台端口
EMQX管理页面为http://127.0.0.1:18083。把IP替换成你的服务器IP,默认用户名密码为admin/public
#启动服务
systemctl start emqttd
#停止服务
systemctl stop emqttd
#重启服务
systemctl restart emqttd
二、安装EMQX
1. Docker安装
docker pull emqx/emqx
docker run -d --name emqx --privileged=true --restart=always -p 18083:18083 -p 1883:1883 -p 8085:8085 -p 8083:8083 -p 8084:8084 -p 8883:8883 -v /web/EMQX/connf/emqx_auth_http.conf:/opt/emqx/etc/plugins/emqx_auth_http.conf emqx/emqx:latest
2.Linnx安装
sudo yum install emqx-4.0.0
三、HTTP认证
1. 关闭匿名认证
进入emqx安装目录下/etc,打开emqx.conf文件,搜索anonymous,找到这一行allow_anonymous = true,把true改为false。
2. 配置HTTP认证插件
打开EMQ的配置文件
1. cd /etc/emqx/plugins/ 2. vim emqx_auth_http.conf
修改认证的接口为服务端提供的认证接口,这样设备连接EMQ的时候会先走我们的服务端去校验用户名、密码、客户端ID是否合法。
-##-------------------------------------------------------------------- ## HTTP Auth/ACL Plugin ##-------------------------------------------------------------------- ##-------------------------------------------------------------------- ## Authentication request. ## ## Variables: ## - %u: username ## - %c: clientid ## - %a: ipaddress ## - %P: password ## ## Value: URL auth.http.auth_req = http://172.95.177.11:8899/scc/mqtt/auth ## Value: post | get | put auth.http.auth_req.method = post ## Value: Params auth.http.auth_req.params = clientid=%c,username=%u,password=%P ##-------------------------------------------------------------------- ## Superuser request. ## ## Variables: ## - %u: username ## - %c: clientid ## - %a: ipaddress ## ## Value: URL auth.http.super_req = http://172.95.177.11:8899/scc/mqtt/superuser ## Value: post | get | put auth.http.super_req.method = post ## Value: Params auth.http.super_req.params = clientid=%c,username=%u ##-------------------------------------------------------------------- ## ACL request. ## ## Variables: ## - %A: 1 | 2, 1 = sub, 2 = pub ## - %u: username ## - %c: clientid ## - %a: ipaddress ## - %t: topic ## ## Value: URL auth.http.acl_req = http://172.95.177.11:8899/scc/mqtt/acl ## Value: post | get | put auth.http.acl_req.method = get ## Value: Params auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
3.开启插件,重启EMQX
打开http://127.0.0.1:18083/EMQ的网页管理页面,找到http认证的插件,开启它
systemctl restart emqttd重启EMQ
4.服务端Springboot接口
@ApiOperation(value = "客户端连接授权" ,notes = "客户端连接授权" ) @ApiImplicitParams({ @ApiImplicitParam(name = "clientid" ,value = "客户端clientId" , required = false, dataType = "String"), @ApiImplicitParam(name = "username" ,value = "客户端username" , required = false, dataType = "String"), @ApiImplicitParam(name = "password" ,value = "客户端password" , required = false, dataType = "String") }) @RequestMapping(value = "/auth", method = RequestMethod.POST) public void checkUser(String clientid, String username, String password, HttpServletResponse response) { logger.info("普通用户;clientid:" + clientid + ";username:" + username + ";password:" + password); //计算用户的剩余设备 if(productService.countDevNumByPid(username)<=0){ response.setStatus(402); return; } if (!"".equals(clientid) && productService.findByIdAndToken(username, password).size()>0&&productService.getDevStatusBySn(clientid)!=1) { System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"允许通过"); response.setStatus(200); } else { System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"禁止通过"); response.setStatus(401); } } @RequestMapping("/superuser") public void mqttSuperuser(String clientid, String username, HttpServletResponse response) { //auth.http.super_req.params = clientid=%c,username=%u if(clientid.startsWith("server_client_")|| clientid.startsWith("web_client_")||clientid.startsWith("wxapp_client_")){ response.setStatus(200); return; } logger.info("超级用户;clientid:" + clientid + ";username:" + username); System.out.println("超级用户;clientid:" + clientid + ";username:" + username); response.setStatus(200); } @RequestMapping("/acl") public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) { //auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic); System.out.println("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic); response.setStatus(200); }
四、上线下线接口
如何做到设备上线下线实时显示呢,这里我们可以接用EMQ提供的系统主题即$SYS/,在我们的 服务端订阅这个主题,当设备连接到EMQ上时,EMQ服务器就会发送一条系统主题如:
$SYS/brokers/emqx@127.0.0.1/clients/sn0054556/disconnected
$SYS/brokers/emqx@127.0.0.1/clients/sn0054556/connected
我们通过订阅系统主题就可以实现实时获得设备状态的效果。
找到EMQ安装位置下的acl.conf文件。vi这个文件。替换为下边的配置方式
允许客户端订阅"SYS/brokers/+/clients/#"主题,但是禁止订阅"SYS/brokers/+/clients/#"主题,但是禁止订阅"SYS/brokers/+/clients/#"主题,但是禁止订阅"SYS/#"主题。这样系统的安全性会提升
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}. {allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}. {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}. {allow, all}.
五、HTTP API
这里为什么要调用EMQ的HTTPAPI呢,主要是为了使服务端和Broker分离,各司其职。我的服务端不再存储任何设备状态信息。关于设备的缓存,在线状态全部交由EMQ来管控。利用API我们可以随时知道当前EMQ连接了那些设备。设备的详细信息。因为我们订阅上线、下线消息只能满足与用户刚好登录了页面。在连接上EMQ的这段时间内所得到的设备上线下线情况。我这里是前端单独是MQTT客户端、服务端也是一个MQTT客户端。所以通过服务端调用HTTPAPI可以随时获取当前设备在线情况,而不是自己在服务端自己维护一套设备的session。
在服务端service层添加如下代码,然后调用这个service即可调用EMQ的各种API了,非常好用。
如获得在线设备/api/v4/clients具体参考https://docs.emqx.io/broker/latest/cn/advanced/http-api.html#endpoint-nodes
这里还遇到一个坑。就是数据流读取的问题,在服务器上读取的流经常会断掉,导致json数据异常。而本地运行完全没有这个问题,后来经师傅提点才改进了流读取方式,下边的是非常完好的流读取方式。
//用户名 private static String username = "admin"; //登录密码 private static String password = "public"; //服务器地址 private static String serverPath = "http://127.9.0.1:18083"; //当前页 private static int pageIndex = 1; //页大小 private static int pageSize = 100; @Override public String query(String queryPathUrl, int pageIndex, int pageSize) throws Exception { //拼接查询参数 if(pageIndex>0&&pageSize>0){ queryPathUrl = queryPathUrl +"?" + "_page=" + pageIndex + "&" + "_limit=" + pageSize; } URL url = new URL(serverPath+queryPathUrl); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); String authorization = getBase64(username, password); //连接认证信息放在头里,注意,base64可以反编码,有安全隐患 conn.setRequestProperty("authorization", "Basic "+authorization); conn.setRequestMethod("GET"); // 开始连接 conn.connect(); String resule = null ; if (conn.getResponseCode() == 200) { // 请求返回的数据 InputStream inputStream = conn.getInputStream(); byte[] readBuffer = new byte[1024]; int numBytes = -1; ByteArrayOutputStream resultB = new ByteArrayOutputStream(); // try { while (inputStream.available() > 0) { numBytes = inputStream.read(readBuffer); if (numBytes >= 0) { resultB.write(readBuffer, 0, numBytes); readBuffer = new byte[1024]; Thread.sleep(500); } } resule = new String(resultB.toByteArray(), "UTF-8"); inputStream.close(); } return resule; }
六、调优
关于调优部分,需要设置许多liunx的参数,改配置文件。这里就不介绍了,因为随着版本的升级,官网都会实时更新。官网的步骤也是极其详细。