基于开源JAVA MQTT Client连接阿里云IoT

简介: 在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。

作者:俏巴

概述

在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布。

操作步骤

1、创建产品和设备

参考:阿里云物联网平台Qucik Start 创建产品和设备部分。

2、pom.xml

<dependencies>
       <dependency>
           <groupId>org.eclipse.paho</groupId>
           <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
           <version>1.1.0</version>
       </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>23.0</version>
       </dependency>
   </dependencies>

3、工具类 AliyunIoTSignUtil

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**
 * AliyunIoTSignUtil
 */

public class AliyunIoTSignUtil {
    public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
        //将参数Key按字典顺序排序
        String[] sortedKeys = params.keySet().toArray(new String[] {});
        Arrays.sort(sortedKeys);

        //生成规范化请求字符串
        StringBuilder canonicalizedQueryString = new StringBuilder();
        for (String key : sortedKeys) {
            if ("sign".equalsIgnoreCase(key)) {
                continue;
            }
            canonicalizedQueryString.append(key).append(params.get(key));
        }

        try {
            String key = deviceSecret;
            return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * HMACSHA1加密
     *
     */
    public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
        SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
        Mac mac = Mac.getInstance(secretKey.getAlgorithm());
        mac.init(secretKey);
        byte[] data = mac.doFinal(content.getBytes("utf-8"));
        return bytesToHexString(data);
    }

    public static final String bytesToHexString(byte[] bArray) {

        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }
}

4、main方法

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTDemoPubSubDemo {

    public static String productKey = "********";
    public static String deviceName = "OpenMQTTDevice";
    public static String deviceSecret = "********";
    public static String regionId = "cn-shanghai";

    // 物模型-属性上报topic
    private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post";
    // 自定义topic,在产品Topic列表位置定义
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/newdatademo";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
        // 汇报属性
        postDeviceProperties();
        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("Sub message");
                System.out.println("Topic : " + s);
                System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties() {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值");
            String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(1);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

5、运行测试情况
image.png

image.png

image.png

参考链接

基于开源MQTT自主接入阿里云IoT平台(Java)
MQTT-TCP连接通信

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
设计模式 敏捷开发 Java
全网首发!Java界的四大名著之一:Java编程思想最新中文版已开源
老版《Java编程思想》(原书名《Thinking in Java》)得益作者开放深度研讨的创作方式,受到了全世界读者的追捧,被译为了十几种语言。但遗憾的是,在经历了 4 个版本的更新后,其最后一版发布于 2007 年,之后再无更新。
|
11天前
|
存储 安全 物联网
使用 Java 进行物联网(IoT)应用开发
【4月更文挑战第19天】Java,凭借其跨平台特性和丰富库,成为物联网开发热门选择。开发者利用Java进行物联网应用开发,可实现设备连接、数据处理、数据库管理及安全保障。熟悉Java基础、物联网知识、数据通信和数据库管理是必备技能。利用MQTT客户端、数据存储框架和可视化工具能提升开发效率。随着物联网发展,Java在该领域的影响力将持续增强。
|
20小时前
|
Arthas 监控 IDE
去哪儿网开源的一个对应用透明,无侵入的Java应用诊断工具
今天 V 哥给大家带来一款开源工具Bistoury,Bistoury 是去哪儿网开源的一个对应用透明,无侵入的java应用诊断工具,用于提升开发人员的诊断效率和能力。
5 0
|
1月前
|
Java Maven Android开发
java如何连接mqtt
java如何连接mqtt
51 0
|
3月前
|
设计模式 Java 程序员
感动哭了!Java界的四大名著之一:Java编程思想最新中文版已开源
还记得这本书吗?是不是已经在你的桌上铺满厚厚的一层灰了?随着 Java 8 的出现,这门语言在许多地方发生了翻天覆地的变化。最新版已经出来了,在新的版本中,代码的运用和实现上与以往不尽相同。
|
3月前
|
算法 Java 程序员
阿里P8大佬终于把春招面试必备的神级Java面试手册给开源了!
先说说Java Java 作为国人编程开发语言中的 NO.1,已经占比半壁江山,选择入行做 IT 做编程开发的人,基本都把它作为首选语言,进大厂拿高薪也是大多数小伙伴们的梦想。 以前Java 岗位人才的空缺,而需求量又大,所以这种人才供不应求的现状,就是 Java 工程师的薪资待遇相对优厚的原因所在。 但是随着这个从事行业的人数逐渐增多,行业竞争也越来越大,招聘的企业和程序员们都想招聘到自己需要的人才/找到自己理想的岗位,国内大厂尤其是阿里招聘Java岗位居多,导致现在 Java 面试越来越难,内卷早就是大势所趋,万物皆可卷,卷的我们都见怪不怪了。 那么,阿里Java面试难度大吗?
|
3月前
|
JSON fastjson 数据库
字符编码导致Rapidjson(腾讯开源的json解析库)到Fastjson(阿里开发的Java json解析库)转换失败的原因分析
最近在客户端的开发的过程中,使用到了RapidJson,公司的开发是客户端和数据库端都由不同的人进行开发,我负责的客户端的逻辑开发(使用c++),开发工具同时使用了VS2017和QT的编译环境,使用QT主要是为了客户端界面开发方便,而使用了VS环境主要是维护公司开发的数据库接口库,这个库的唯一作用就是作为一个中间桥梁,使用Rapidjson将数据库接口的json数据格式解析为结构体数据,从而在客户端界面进行展示,或者接收客户端的数据,使用Rapidjson将其转换为json数据,发送给数据库接口以保存数据使用 。不太明白的可以参考我上一篇文章说明Rapidjson的使用过程-Parse解析数组
54 0
|
3月前
|
设计模式 算法 NoSQL
阿里巴巴官方上线!号称国内Java八股文天花板(终极版)首次开源
真正有意义的就业与跳槽,是要进入到一个有绝对潜力的行业或者薪资能实现爆炸式增长的。这件事不容易,但也没有想象的遥不可及,现在大环境不好,机会也不如以前多,除了让自身技术能力过硬,面试更是要好好准备! 如何准备? 除了平时的技术积累与沉淀之外,剩下的就只能背八股了(虽然工作用不到,但面试就是要问,不背是不行的)。
83 0
|
7月前
|
编解码 小程序 JavaScript
阿里云IoT小程序应用开发和组件实践
通过实验,了解阿里云IoT小程序的应用开发的方法,了解其内置的基础组件使用,以及基于Vue.js实现可复用的自定义组件的方法。
341 1
|
7月前
|
运维 安全 物联网
使用阿里云 IoT 安全中心保护智慧遥控器
在物联网领域中,我们的 TO B 智慧设备,在发货之后,出现了不少困扰我们的安全问题,比如会被恶意安装应用,访问非法网站等,增加厂家的运维成本。 同时设备上的一些技术机密也容易被好事之人破解,对厂商构成商业损失,直到我们发现了阿里云物联网的一款安全防护产品 -- IoT 安全中心。它主打的 ID² 和安全运营有效的解决了我们的痛点。
395 3