JAVA MQTT Client如何连接阿里云IoT?

简介: 在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布。

概述



在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布



操作步骤


1、创建产品和设备



参考:阿里云物联网平台Qucik Start 创建产品和设备部分。



2、pom.xml


   <dependencies>
    &lt;dependency&gt;
        &lt;groupId&gt;org.eclipse.paho&lt;/groupId&gt;
        &lt;artifactId&gt;org.eclipse.paho.client.mqttv3&lt;/artifactId&gt;
        &lt;version&gt;1.1.0&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.google.guava&lt;/groupId&gt;
        &lt;artifactId&gt;guava&lt;/artifactId&gt;
        &lt;version&gt;23.0&lt;/version&gt;
    &lt;/dependency&gt;
&lt;/dependencies&gt;</code></pre> 

3、工具类 AliyunIoTSignUtil


import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**

AliyunIoTSignUtil
*/

public class AliyunIoTSignUtil {

public static String sign(Map&lt;String, String&gt; params, String deviceSecret, String signMethod) {
    //将参数Key按字典顺序排序
    String[] sortedKeys &#61; params.keySet().toArray(new String[] {});
    Arrays.sort(sortedKeys);

    //生成规范化请求字符串
    StringBuilder canonicalizedQueryString &#61; new StringBuilder();
    for (String key : sortedKeys) {
        if (&#34;sign&#34;.equalsIgnoreCase(key)) {
            continue;
        }
        canonicalizedQueryString.append(key).append(params.get(key));
    }

    try {
        String key &#61; deviceSecret;
        return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

/**
 * HMACSHA1加密
 *
 */
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
    SecretKey secretKey &#61; new SecretKeySpec(key.getBytes(&#34;utf-8&#34;), signMethod);
    Mac mac &#61; Mac.getInstance(secretKey.getAlgorithm());
    mac.init(secretKey);
    byte[] data &#61; mac.doFinal(content.getBytes(&#34;utf-8&#34;));
    return bytesToHexString(data);
}

public static final String bytesToHexString(byte[] bArray) {

    StringBuffer sb &#61; new StringBuffer(bArray.length);
    String sTemp;
    for (int i &#61; 0; i &lt; bArray.length; i&#43;&#43;) {
        sTemp &#61; Integer.toHexString(0xFF &amp; bArray[i]);
        if (sTemp.length() &lt; 2) {
            sb.append(0);
        }
        sb.append(sTemp.toUpperCase());
    }
    return sb.toString();
}

}


4、main方法


import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTDemoPubSubDemo {

public static String productKey &#61; &#34;********&#34;;
public static String deviceName &#61; &#34;OpenMQTTDevice&#34;;
public static String deviceSecret &#61; &#34;********&#34;;
public static String regionId &#61; &#34;cn-shanghai&#34;;

// 物模型-属性上报topic
private static String pubTopic &#61; &#34;/sys/&#34; &#43; productKey &#43; &#34;/&#34; &#43; deviceName &#43; &#34;/thing/event/property/post&#34;;
// 自定义topic&#xff0c;在产品Topic列表位置定义
private static String subTopic &#61; &#34;/&#34;&#43;productKey &#43; &#34;/&#34; &#43; deviceName&#43;&#34;/user/newdatademo&#34;;

private static MqttClient mqttClient;

public static void main(String [] args){

    initAliyunIoTClient();

// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);

    // 汇报属性
    postDeviceProperties();
    try {
        mqttClient.subscribe(subTopic); // 订阅Topic
    } catch (MqttException e) {
        System.out.println(&#34;error:&#34; &#43; e.getMessage());
        e.printStackTrace();
    }

    // 设置订阅监听
    mqttClient.setCallback(new MqttCallback() {
        &#64;Override
        public void connectionLost(Throwable throwable) {
            System.out.println(&#34;connection Lost&#34;);

        }

        &#64;Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            System.out.println(&#34;Sub message&#34;);
            System.out.println(&#34;Topic : &#34; &#43; s);
            System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad
        }

        &#64;Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    });

}

/**
 * 初始化 Client 对象
 */
private static void initAliyunIoTClient() {

    try {
        // 构造连接需要的参数
        String clientId &#61; &#34;java&#34; &#43; System.currentTimeMillis();
        Map&lt;String, String&gt; params &#61; new HashMap&lt;&gt;(16);
        params.put(&#34;productKey&#34;, productKey);
        params.put(&#34;deviceName&#34;, deviceName);
        params.put(&#34;clientId&#34;, clientId);
        String timestamp &#61; String.valueOf(System.currentTimeMillis());
        params.put(&#34;timestamp&#34;, timestamp);
        // cn-shanghai
        String targetServer &#61; &#34;tcp://&#34; &#43; productKey &#43; &#34;.iot-as-mqtt.&#34;&#43;regionId&#43;&#34;.aliyuncs.com:1883&#34;;

        String mqttclientId &#61; clientId &#43; &#34;|securemode&#61;3,signmethod&#61;hmacsha1,timestamp&#61;&#34; &#43; timestamp &#43; &#34;|&#34;;
        String mqttUsername &#61; deviceName &#43; &#34;&amp;&#34; &#43; productKey;
        String mqttPassword &#61; AliyunIoTSignUtil.sign(params, deviceSecret, &#34;hmacsha1&#34;);

        connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

    } catch (Exception e) {
        System.out.println(&#34;initAliyunIoTClient error &#34; &#43; e.getMessage());
    }
}

public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

    MemoryPersistence persistence &#61; new MemoryPersistence();
    mqttClient &#61; new MqttClient(url, clientId, persistence);
    MqttConnectOptions connOpts &#61; new MqttConnectOptions();
    // MQTT 3.1.1
    connOpts.setMqttVersion(4);
    connOpts.setAutomaticReconnect(false);
    connOpts.setCleanSession(true);

    connOpts.setUserName(mqttUsername);
    connOpts.setPassword(mqttPassword.toCharArray());
    connOpts.setKeepAliveInterval(60);

    mqttClient.connect(connOpts);
}

/**
 * 汇报属性
 */
private static void postDeviceProperties() {

    try {
        //上报数据
        //高级版 物模型-属性上报payload
        System.out.println(&#34;上报属性值&#34;);
        String payloadJson &#61; &#34;{\&#34;params\&#34;:{\&#34;Status\&#34;:0,\&#34;Data\&#34;:\&#34;15\&#34;}}&#34;;
        MqttMessage message &#61; new MqttMessage(payloadJson.getBytes(&#34;utf-8&#34;));
        message.setQos(1);
        mqttClient.publish(pubTopic, message);
    } catch (Exception e) {
        System.out.println(e.getMessage());
    }
}

}


5、运行测试情况







参考链接


基于开源MQTT自主接入阿里云IoT平台(Java)

MQTT-TCP连接通信


云服务器ECS地址:阿里云·云小站

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
16天前
|
物联网
一个绿色的免费MQTT调试工具,实测连接移动OneNET
一个绿色的免费MQTT调试工具,实测连接移动OneNET,主题发布 ,主题订阅。
108 16
|
3月前
|
Arthas 监控 Java
拥抱 OpenTelemetry:阿里云 Java Agent 演进实践
本文介绍了阿里云 Java Agent 4.x 版本在基于 OTel Java Agent 二次开发过程中的实践与思考,并重点从功能、性能、稳定性、兼容性四个方面介绍了所做的工作。同时也介绍了阿里云可观测团队积极参与开源建设取得的丰厚成果。
392 9
拥抱 OpenTelemetry:阿里云 Java Agent 演进实践
|
8月前
|
消息中间件 存储 开发工具
消息队列 MQ产品使用合集之C++如何使用Paho MQTT库进行连接、发布和订阅消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
Java
用java实现Client和Server之间的互相通信
本文介绍了如何使用Java实现客户端和服务器之间的通信,包括服务器端创建ServerSocket、接受客户端连接、读取和发送消息,以及客户端创建Socket连接、发送和接收消息的完整过程。
120 0
用java实现Client和Server之间的互相通信
|
5月前
|
Java API Nacos
Caused by: java.lang.IllegalStateException: No Feign Client for loadBalancing defined. Did you forge
Caused by: java.lang.IllegalStateException: No Feign Client for loadBalancing defined. Did you forge
301 2
|
5月前
|
存储 SQL 分布式计算
Java连接阿里云MaxCompute例
要使用Java连接阿里云MaxCompute数据库,首先需在项目中添加MaxCompute JDBC驱动依赖,推荐通过Maven管理。避免在代码中直接写入AccessKey,应使用环境变量或配置文件安全存储。示例代码展示了如何注册驱动、建立连接及执行SQL查询。建议使用RAM用户提升安全性,并根据需要配置时区和公网访问权限。具体步骤和注意事项请参考阿里云官方文档。
511 10
|
6月前
|
Java 开发工具
通过Java SDK调用阿里云模型服务
在阿里云平台上,可以通过创建应用并使用模型服务完成特定任务,如生成文章内容。本示例展示了一段简化的Java代码,演示了如何调用阿里云模型服务生成关于“春秋战国经济与文化”的简短文章。示例代码通过设置系统角色为历史学家,并提出文章生成需求,最终处理并输出生成的文章内容。在实际部署前,请确保正确配置环境变量中的密钥和ID,并根据需要调整SDK导入语句及类名。更多详情和示例,请参考相关链接。
|
6月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
427 0
RocketMQ—一次连接namesvr失败的案例分析
|
6月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
206 0
|
7月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

热门文章

最新文章