阿里云物联网平台之利用云平台流转如何实现同一款产品下任意俩个设备的通信?

简介: 大部分同学应该都知道这种正常的基于云平台流转的M2M设备间通信,可以指定A产品的某个设备的消息流转到B产品的某个设备,或者从B产品的某个设备流转到A产品的某个设备,具有一定的指向性。那么如何才能够体现任意俩个字呢?假设把产品下每个产品都当成群成员,如何才能让他们之间自由的发言呢?也就是说我如何只配置一条规则,就可以实现这个产品下的设备进行自由通信,而不是A->B要配置一条规则,B->C还要配置一条规则,甚至B->A也要配置一条规则。

原理

实现上述需求,其核心就是任意俩个字,而解决这种问题的手段一般就是“通配”。虽然是通配,但是每一条消息实际上还是有个“明确”的目的地的

(1)规则引擎配置:

源端:配置通配符 "+"

这一条保证消息源可以是任意的

目的端:配置通配符"${TargetDevice}"

这一条保证目的地是任意的,而真正的目的地可以根据TargetDevice变化而变化。

(2)设备端

源设备上报TargetDevice字段 ,表明该消息去向

Step By Step

前提:测试相关的环境准备好,两个程序(设备)(A作为源,B作为目的地),物联网平台上topic啊,物模型啊之类的都定义好。

1、配置规则引擎
(1)编写SQL语句
这里选择的是自定义topic消息,也可以选其他的。(注意topic需要有发布权限)

image.png

SELECT TargetDevice FROM "/a16*JpRCl/+/user/update"

(2)添加操作(配置目的地)
这里选下发到另一个自定义topic(注意,测试设备需要成功订阅这个topic)
image.png

/a16*pRCl/${TargetDevice}/user/get

(3)启动规则
image.png

2、设备端开发
源设备A:上报消息(消息中一定要包含TargetDevice字段)
这里体现TargetDevice必须要有,你们自己测试的时候可以再加些其他字段
image.png

源码:
AliyunIoTSignUtil:

package com.alibaba.taro;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**
 * AliyunIoTSignUtil
 */

public class AliyunIoTSignUtil {
    public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
        //将参数Key按字典顺序排序
        String[] sortedKeys = params.keySet().toArray(new String[] {});
        Arrays.sort(sortedKeys);

        //生成规范化请求字符串
        StringBuilder canonicalizedQueryString = new StringBuilder();
        for (String key : sortedKeys) {
            if ("sign".equalsIgnoreCase(key)) {
                continue;
            }
            canonicalizedQueryString.append(key).append(params.get(key));
        }

        try {
            String key = deviceSecret;
            return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * HMACSHA1加密
     *
     */
    public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
        SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
        Mac mac = Mac.getInstance(secretKey.getAlgorithm());
        mac.init(secretKey);
        byte[] data = mac.doFinal(content.getBytes("utf-8"));
        return bytesToHexString(data);
    }

    public static final String bytesToHexString(byte[] bArray) {

        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }
}

设备A:

package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo {

    public static String productKey = "a16***pRCl";
    public static String deviceName = "IoTDeviceDemo1";
    public static String deviceSecret = "a3b15a11*****c4952";
    public static String regionId = "cn-shanghai";

    // 自定义topic,在产品Topic列表位置定义
    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/update";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/update";

    private static MqttClient mqttClient;
    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
        // 汇报属性
        String payloadJson = "{\"TargetDevice\":\"IoTDeviceDemo2\"}";
        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //打印输出消息payLoad             
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            //String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:443";
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值:");
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服务返回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("服务调用返回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}



目的设备B:
订阅规则引擎配置的topic
/a16*pRCl/${TargetDevice}/user/get

image.png

源码:
B设备


package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo2 {

    public static String productKey = "a1*****pRCl";
    public static String deviceName = "IoTDeviceDemo2";
    public static String deviceSecret = "0895205*****7e2b4bf2";
    public static String regionId = "cn-shanghai";

    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
       
         String payloadJson = "{\"tts\":\"ss\"}";

        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //打印输出消息payLoad           
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值:");
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服务返回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("服务调用返回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}


测试结果

1、运行B程序、再运行A程序
A程序上报消息
image.png

B程序收到云平台转发消息
image.png

2、控制台日志
2020/12/20 14:16:28.304
设备A上报消息:
image.png

2020/12/20 14:16:28.334
平台触发规则引擎转发消息
image.png

2020/12/20 14:16:28.336
设备B收到转发的消息
image.png

总览:
image.png

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
14天前
|
存储 人工智能 弹性计算
国内首个,阿里云入选Gartner®战略云平台魔力象限挑战者
近日,Gartner发布2024年《战略云平台魔力象限》(Magic Quadrant™ for Strategic Cloud Platform Services)报告,阿里云从利基者象限进入挑战者象限,成为国内首个入选该象限的中国公共云厂商。
|
1月前
|
物联网 5G 智能硬件
物联网卡:物联网卡不支持语音通话,是如何实现设备间的数据传输和通信的?
物联网卡(IoT SIM卡)通常被设计用于支持物联网(IoT)设备之间的数据传输,而不直接支持语音通话功能。这是因为物联网设备主要关注的是数据的收集、传输和处理,而不是语音通信。为了实现设备间的数据传输和通信,物联网卡及其背后的技术采用了多种方法,主要包括但不限于以下几种方式:
物联网卡:物联网卡不支持语音通话,是如何实现设备间的数据传输和通信的?
|
1月前
|
存储 边缘计算 物联网
阿里云物联网平台:推动万物互联的智能化解决方案
随着物联网技术的快速发展,阿里云物联网平台为企业提供了一体化的解决方案,包括设备接入、数据管理和智能应用等核心功能。平台支持海量设备接入、实时数据采集与存储、边缘计算,并具备大规模设备管理、高安全性和开放生态等优势。广泛应用于智能制造、智慧城市和智能家居等领域,助力企业实现数字化转型。
188 5
|
2月前
|
传感器 监控 安全
物联网通信的基石:LoRa、Sigfox与NB-IoT详解
物联网通信的基石:LoRa、Sigfox与NB-IoT详解
334 0
|
4月前
|
弹性计算 Linux 网络安全
使用阿里云服务器迁移中心SMC将其他云平台业务迁移至阿里云教程参考
现在越来越多的个人和企业用户选择将其他云平台或者服务商的业务迁移到阿里云,但是如何快速且安全完成迁移是很多用户比较关注的问题,我们可以选择使用阿里云提供的服务器迁移中心(Server Migration Center,简称SMC),这个产品是阿里云提供给您的迁移平台,专注于提供能力普惠、体验一致、效率至上的迁移服务,满足您在阿里云的迁移需求。本文为大家展示使用阿里云服务器迁移中心SMC将其他云平台业务迁移至阿里云的教程,以供参考。
使用阿里云服务器迁移中心SMC将其他云平台业务迁移至阿里云教程参考
|
3月前
|
物联网 5G
【2022年无线通信和与物联网专场】东南大学尤肖虎教授-超高可靠、超低时延5G/6G移动通信基础理论研究与发展
东南大学尤肖虎教授在2022年无线通信和物联网专场中就超高可靠、超低时延的5G/6G移动通信基础理论研究与发展的讲座内容。
59 3
|
3月前
|
存储 安全 物联网
物联网中的通信模型
【8月更文挑战第23天】
45 0
|
3月前
|
物联网 C语言
C语言与物联网:设备间的通信与控制
C语言与物联网:设备间的通信与控制
51 0
|
3月前
|
物联网 网络性能优化 Python
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
91 0
|
4月前
|
存储 运维 监控
阿里云物联网平台的优势
【7月更文挑战第19天】阿里云物联网平台的优势
77 1

相关产品

  • 物联网平台
  • 下一篇
    无影云桌面