JAVA MQTT Client如何连接阿里云IoT?

简介: 在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布。

概述



在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布



操作步骤


1、创建产品和设备



参考:阿里云物联网平台Qucik Start 创建产品和设备部分。



2、pom.xml


   <dependencies>
    &lt;dependency&gt;
        &lt;groupId&gt;org.eclipse.paho&lt;/groupId&gt;
        &lt;artifactId&gt;org.eclipse.paho.client.mqttv3&lt;/artifactId&gt;
        &lt;version&gt;1.1.0&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.google.guava&lt;/groupId&gt;
        &lt;artifactId&gt;guava&lt;/artifactId&gt;
        &lt;version&gt;23.0&lt;/version&gt;
    &lt;/dependency&gt;
&lt;/dependencies&gt;</code></pre> 

3、工具类 AliyunIoTSignUtil


import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**

AliyunIoTSignUtil
*/

public class AliyunIoTSignUtil {

public static String sign(Map&lt;String, String&gt; params, String deviceSecret, String signMethod) {
    //将参数Key按字典顺序排序
    String[] sortedKeys &#61; params.keySet().toArray(new String[] {});
    Arrays.sort(sortedKeys);

    //生成规范化请求字符串
    StringBuilder canonicalizedQueryString &#61; new StringBuilder();
    for (String key : sortedKeys) {
        if (&#34;sign&#34;.equalsIgnoreCase(key)) {
            continue;
        }
        canonicalizedQueryString.append(key).append(params.get(key));
    }

    try {
        String key &#61; deviceSecret;
        return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

/**
 * HMACSHA1加密
 *
 */
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
    SecretKey secretKey &#61; new SecretKeySpec(key.getBytes(&#34;utf-8&#34;), signMethod);
    Mac mac &#61; Mac.getInstance(secretKey.getAlgorithm());
    mac.init(secretKey);
    byte[] data &#61; mac.doFinal(content.getBytes(&#34;utf-8&#34;));
    return bytesToHexString(data);
}

public static final String bytesToHexString(byte[] bArray) {

    StringBuffer sb &#61; new StringBuffer(bArray.length);
    String sTemp;
    for (int i &#61; 0; i &lt; bArray.length; i&#43;&#43;) {
        sTemp &#61; Integer.toHexString(0xFF &amp; bArray[i]);
        if (sTemp.length() &lt; 2) {
            sb.append(0);
        }
        sb.append(sTemp.toUpperCase());
    }
    return sb.toString();
}

}


4、main方法


import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTDemoPubSubDemo {

public static String productKey &#61; &#34;********&#34;;
public static String deviceName &#61; &#34;OpenMQTTDevice&#34;;
public static String deviceSecret &#61; &#34;********&#34;;
public static String regionId &#61; &#34;cn-shanghai&#34;;

// 物模型-属性上报topic
private static String pubTopic &#61; &#34;/sys/&#34; &#43; productKey &#43; &#34;/&#34; &#43; deviceName &#43; &#34;/thing/event/property/post&#34;;
// 自定义topic&#xff0c;在产品Topic列表位置定义
private static String subTopic &#61; &#34;/&#34;&#43;productKey &#43; &#34;/&#34; &#43; deviceName&#43;&#34;/user/newdatademo&#34;;

private static MqttClient mqttClient;

public static void main(String [] args){

    initAliyunIoTClient();

// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);

    // 汇报属性
    postDeviceProperties();
    try {
        mqttClient.subscribe(subTopic); // 订阅Topic
    } catch (MqttException e) {
        System.out.println(&#34;error:&#34; &#43; e.getMessage());
        e.printStackTrace();
    }

    // 设置订阅监听
    mqttClient.setCallback(new MqttCallback() {
        &#64;Override
        public void connectionLost(Throwable throwable) {
            System.out.println(&#34;connection Lost&#34;);

        }

        &#64;Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            System.out.println(&#34;Sub message&#34;);
            System.out.println(&#34;Topic : &#34; &#43; s);
            System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad
        }

        &#64;Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    });

}

/**
 * 初始化 Client 对象
 */
private static void initAliyunIoTClient() {

    try {
        // 构造连接需要的参数
        String clientId &#61; &#34;java&#34; &#43; System.currentTimeMillis();
        Map&lt;String, String&gt; params &#61; new HashMap&lt;&gt;(16);
        params.put(&#34;productKey&#34;, productKey);
        params.put(&#34;deviceName&#34;, deviceName);
        params.put(&#34;clientId&#34;, clientId);
        String timestamp &#61; String.valueOf(System.currentTimeMillis());
        params.put(&#34;timestamp&#34;, timestamp);
        // cn-shanghai
        String targetServer &#61; &#34;tcp://&#34; &#43; productKey &#43; &#34;.iot-as-mqtt.&#34;&#43;regionId&#43;&#34;.aliyuncs.com:1883&#34;;

        String mqttclientId &#61; clientId &#43; &#34;|securemode&#61;3,signmethod&#61;hmacsha1,timestamp&#61;&#34; &#43; timestamp &#43; &#34;|&#34;;
        String mqttUsername &#61; deviceName &#43; &#34;&amp;&#34; &#43; productKey;
        String mqttPassword &#61; AliyunIoTSignUtil.sign(params, deviceSecret, &#34;hmacsha1&#34;);

        connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

    } catch (Exception e) {
        System.out.println(&#34;initAliyunIoTClient error &#34; &#43; e.getMessage());
    }
}

public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

    MemoryPersistence persistence &#61; new MemoryPersistence();
    mqttClient &#61; new MqttClient(url, clientId, persistence);
    MqttConnectOptions connOpts &#61; new MqttConnectOptions();
    // MQTT 3.1.1
    connOpts.setMqttVersion(4);
    connOpts.setAutomaticReconnect(false);
    connOpts.setCleanSession(true);

    connOpts.setUserName(mqttUsername);
    connOpts.setPassword(mqttPassword.toCharArray());
    connOpts.setKeepAliveInterval(60);

    mqttClient.connect(connOpts);
}

/**
 * 汇报属性
 */
private static void postDeviceProperties() {

    try {
        //上报数据
        //高级版 物模型-属性上报payload
        System.out.println(&#34;上报属性值&#34;);
        String payloadJson &#61; &#34;{\&#34;params\&#34;:{\&#34;Status\&#34;:0,\&#34;Data\&#34;:\&#34;15\&#34;}}&#34;;
        MqttMessage message &#61; new MqttMessage(payloadJson.getBytes(&#34;utf-8&#34;));
        message.setQos(1);
        mqttClient.publish(pubTopic, message);
    } catch (Exception e) {
        System.out.println(e.getMessage());
    }
}

}


5、运行测试情况







参考链接


基于开源MQTT自主接入阿里云IoT平台(Java)

MQTT-TCP连接通信


云服务器ECS地址:阿里云·云小站

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
23天前
|
存储 Java 关系型数据库
高效连接之道:Java连接池原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。频繁创建和关闭连接会消耗大量资源,导致性能瓶颈。为此,Java连接池技术通过复用连接,实现高效、稳定的数据库连接管理。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接池的基本操作、配置和使用方法,以及在电商应用中的具体应用示例。
41 5
|
1月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
20天前
|
SQL Java 数据库连接
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率。本文介绍了连接池的工作原理、优势及实现方法,并提供了HikariCP的示例代码。
34 3
|
20天前
|
Java 数据库连接 数据库
深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能
在Java应用开发中,数据库操作常成为性能瓶颈。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能。文章介绍了连接池的优势、选择和使用方法,以及优化配置的技巧。
22 1
|
20天前
|
Java 数据库连接 数据库
Java连接池在数据库性能优化中的重要作用。连接池通过预先创建和管理数据库连接,避免了频繁创建和关闭连接的开销
本文深入探讨了Java连接池在数据库性能优化中的重要作用。连接池通过预先创建和管理数据库连接,避免了频繁创建和关闭连接的开销,显著提升了系统的响应速度和吞吐量。文章介绍了连接池的工作原理,并以HikariCP为例,展示了如何在Java应用中使用连接池。通过合理配置和优化,连接池技术能够有效提升应用性能。
34 1
|
26天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
1月前
|
SQL 存储 Java
Java中使用ClickHouseDriver连接和基本操作
通过上述步骤,你可以轻松地在Java应用中集成ClickHouse数据库,执行基本的CRUD操作。需要注意的是,实际开发中应当根据实际情况调整数据库连接配置(如URL中的主机、端口、数据库名等),并根据应用需求选择合适的异常处理策略,确保代码的健壮性和资源的有效管理。此外,对于复杂查询和大批量数据处理,建议充分利用ClickHouse的特性(如分布式处理、列式存储优化等),以进一步提升性能。
97 2
|
1月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
25 0
rabbitmq基础教程(ui,java,springamqp)
|
1月前
|
Java
用java实现Client和Server之间的互相通信
本文介绍了如何使用Java实现客户端和服务器之间的通信,包括服务器端创建ServerSocket、接受客户端连接、读取和发送消息,以及客户端创建Socket连接、发送和接收消息的完整过程。
32 0
用java实现Client和Server之间的互相通信
|
1月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
98 0
下一篇
无影云桌面