JAVA MQTT Client如何连接阿里云IoT?-阿里云开发者社区

开发者社区> YunqiQi> 正文

JAVA MQTT Client如何连接阿里云IoT?

简介: 在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布。
+关注继续查看

概述



在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布



操作步骤


1、创建产品和设备



参考:阿里云物联网平台Qucik Start 创建产品和设备部分。



2、pom.xml


   <dependencies>
    &lt;dependency&gt;
        &lt;groupId&gt;org.eclipse.paho&lt;/groupId&gt;
        &lt;artifactId&gt;org.eclipse.paho.client.mqttv3&lt;/artifactId&gt;
        &lt;version&gt;1.1.0&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.google.guava&lt;/groupId&gt;
        &lt;artifactId&gt;guava&lt;/artifactId&gt;
        &lt;version&gt;23.0&lt;/version&gt;
    &lt;/dependency&gt;
&lt;/dependencies&gt;</code></pre> 

3、工具类 AliyunIoTSignUtil


import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**

AliyunIoTSignUtil
*/

public class AliyunIoTSignUtil {

public static String sign(Map&lt;String, String&gt; params, String deviceSecret, String signMethod) {
    //将参数Key按字典顺序排序
    String[] sortedKeys &#61; params.keySet().toArray(new String[] {});
    Arrays.sort(sortedKeys);

    //生成规范化请求字符串
    StringBuilder canonicalizedQueryString &#61; new StringBuilder();
    for (String key : sortedKeys) {
        if (&#34;sign&#34;.equalsIgnoreCase(key)) {
            continue;
        }
        canonicalizedQueryString.append(key).append(params.get(key));
    }

    try {
        String key &#61; deviceSecret;
        return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

/**
 * HMACSHA1加密
 *
 */
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
    SecretKey secretKey &#61; new SecretKeySpec(key.getBytes(&#34;utf-8&#34;), signMethod);
    Mac mac &#61; Mac.getInstance(secretKey.getAlgorithm());
    mac.init(secretKey);
    byte[] data &#61; mac.doFinal(content.getBytes(&#34;utf-8&#34;));
    return bytesToHexString(data);
}

public static final String bytesToHexString(byte[] bArray) {

    StringBuffer sb &#61; new StringBuffer(bArray.length);
    String sTemp;
    for (int i &#61; 0; i &lt; bArray.length; i&#43;&#43;) {
        sTemp &#61; Integer.toHexString(0xFF &amp; bArray[i]);
        if (sTemp.length() &lt; 2) {
            sb.append(0);
        }
        sb.append(sTemp.toUpperCase());
    }
    return sb.toString();
}

}


4、main方法


import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTDemoPubSubDemo {

public static String productKey &#61; &#34;********&#34;;
public static String deviceName &#61; &#34;OpenMQTTDevice&#34;;
public static String deviceSecret &#61; &#34;********&#34;;
public static String regionId &#61; &#34;cn-shanghai&#34;;

// 物模型-属性上报topic
private static String pubTopic &#61; &#34;/sys/&#34; &#43; productKey &#43; &#34;/&#34; &#43; deviceName &#43; &#34;/thing/event/property/post&#34;;
// 自定义topic&#xff0c;在产品Topic列表位置定义
private static String subTopic &#61; &#34;/&#34;&#43;productKey &#43; &#34;/&#34; &#43; deviceName&#43;&#34;/user/newdatademo&#34;;

private static MqttClient mqttClient;

public static void main(String [] args){

    initAliyunIoTClient();

// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);

    // 汇报属性
    postDeviceProperties();
    try {
        mqttClient.subscribe(subTopic); // 订阅Topic
    } catch (MqttException e) {
        System.out.println(&#34;error:&#34; &#43; e.getMessage());
        e.printStackTrace();
    }

    // 设置订阅监听
    mqttClient.setCallback(new MqttCallback() {
        &#64;Override
        public void connectionLost(Throwable throwable) {
            System.out.println(&#34;connection Lost&#34;);

        }

        &#64;Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            System.out.println(&#34;Sub message&#34;);
            System.out.println(&#34;Topic : &#34; &#43; s);
            System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad
        }

        &#64;Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    });

}

/**
 * 初始化 Client 对象
 */
private static void initAliyunIoTClient() {

    try {
        // 构造连接需要的参数
        String clientId &#61; &#34;java&#34; &#43; System.currentTimeMillis();
        Map&lt;String, String&gt; params &#61; new HashMap&lt;&gt;(16);
        params.put(&#34;productKey&#34;, productKey);
        params.put(&#34;deviceName&#34;, deviceName);
        params.put(&#34;clientId&#34;, clientId);
        String timestamp &#61; String.valueOf(System.currentTimeMillis());
        params.put(&#34;timestamp&#34;, timestamp);
        // cn-shanghai
        String targetServer &#61; &#34;tcp://&#34; &#43; productKey &#43; &#34;.iot-as-mqtt.&#34;&#43;regionId&#43;&#34;.aliyuncs.com:1883&#34;;

        String mqttclientId &#61; clientId &#43; &#34;|securemode&#61;3,signmethod&#61;hmacsha1,timestamp&#61;&#34; &#43; timestamp &#43; &#34;|&#34;;
        String mqttUsername &#61; deviceName &#43; &#34;&amp;&#34; &#43; productKey;
        String mqttPassword &#61; AliyunIoTSignUtil.sign(params, deviceSecret, &#34;hmacsha1&#34;);

        connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

    } catch (Exception e) {
        System.out.println(&#34;initAliyunIoTClient error &#34; &#43; e.getMessage());
    }
}

public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

    MemoryPersistence persistence &#61; new MemoryPersistence();
    mqttClient &#61; new MqttClient(url, clientId, persistence);
    MqttConnectOptions connOpts &#61; new MqttConnectOptions();
    // MQTT 3.1.1
    connOpts.setMqttVersion(4);
    connOpts.setAutomaticReconnect(false);
    connOpts.setCleanSession(true);

    connOpts.setUserName(mqttUsername);
    connOpts.setPassword(mqttPassword.toCharArray());
    connOpts.setKeepAliveInterval(60);

    mqttClient.connect(connOpts);
}

/**
 * 汇报属性
 */
private static void postDeviceProperties() {

    try {
        //上报数据
        //高级版 物模型-属性上报payload
        System.out.println(&#34;上报属性值&#34;);
        String payloadJson &#61; &#34;{\&#34;params\&#34;:{\&#34;Status\&#34;:0,\&#34;Data\&#34;:\&#34;15\&#34;}}&#34;;
        MqttMessage message &#61; new MqttMessage(payloadJson.getBytes(&#34;utf-8&#34;));
        message.setQos(1);
        mqttClient.publish(pubTopic, message);
    } catch (Exception e) {
        System.out.println(e.getMessage());
    }
}

}


5、运行测试情况







参考链接


基于开源MQTT自主接入阿里云IoT平台(Java)

MQTT-TCP连接通信


云服务器ECS地址:阿里云·云小站

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
9421 0
HaaS UI基础教学十:IoT平台连接
阿里云智能物联网平台为设备提供安全可靠的连接通信能力,向下连接海量设备,支撑设备数据采集上云;向上提供云端API,服务端通过调用云端API将指令下发至设备端,实现远程控制。
133 0
阿里云物联网 .NET Core 客户端 | CZGL.AliIoTClient:1. 连接阿里云物联网
1) 阿里云物联网 阿里云物联网支持多种通讯协议,CZGL.AliIoTClient 使用了 MQTT 通讯协议,通讯库为 M2MQTT 。阿里云物联网数据传输有两种数据传输方式,分别是 透传 和 Alink json,两种方式只在属性读/写、事件上报、服务调用这四种 Topic 上有差异, 其它连接通讯、普通 Topic、响应等,无差别。
1226 0
eclipse,myeclipse中java连接数据库的问题
<p>首先,如果没有msbase.jar,mssqlserver,msutil.jar这三个包连接sql需要的包,编写好代码运行会出现异常:</p> <p><strong><span style="color:#ff0000">java.lang.ClassNotFoundException: com.microsoft.jdbc.sqlserver.SQLServerDriver(也有
1849 0
使用xshell连接服务器问题:ssh_exchange_identification: Connection closed by remote host
在连接服务器的时候遇到了如下的问题: 在网上查找方法: 1、让/etc/hosts.allow 和/etc/hosts.
1229 0
Android连接 Mysql: 解决mysql-connector-java驱动编译时Dex cannot parse version 52 byte code...等错误
Error:Error converting bytecode to dex: Cause: Dex cannot parse version 52 byte code.
1395 0
+关注
49
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载