阿里云物联网平台之利用云平台流转如何实现同一款产品下任意俩个设备的通信?

简介: 大部分同学应该都知道这种正常的基于云平台流转的M2M设备间通信,可以指定A产品的某个设备的消息流转到B产品的某个设备,或者从B产品的某个设备流转到A产品的某个设备,具有一定的指向性。那么如何才能够体现任意俩个字呢?假设把产品下每个产品都当成群成员,如何才能让他们之间自由的发言呢?也就是说我如何只配置一条规则,就可以实现这个产品下的设备进行自由通信,而不是A->B要配置一条规则,B->C还要配置一条规则,甚至B->A也要配置一条规则。

原理

实现上述需求,其核心就是任意俩个字,而解决这种问题的手段一般就是“通配”。虽然是通配,但是每一条消息实际上还是有个“明确”的目的地的

(1)规则引擎配置:

源端:配置通配符 "+"

这一条保证消息源可以是任意的

目的端:配置通配符"${TargetDevice}"

这一条保证目的地是任意的,而真正的目的地可以根据TargetDevice变化而变化。

(2)设备端

源设备上报TargetDevice字段 ,表明该消息去向

Step By Step

前提:测试相关的环境准备好,两个程序(设备)(A作为源,B作为目的地),物联网平台上topic啊,物模型啊之类的都定义好。

1、配置规则引擎
(1)编写SQL语句
这里选择的是自定义topic消息,也可以选其他的。(注意topic需要有发布权限)

image.png

SELECT TargetDevice FROM "/a16*JpRCl/+/user/update"

(2)添加操作(配置目的地)
这里选下发到另一个自定义topic(注意,测试设备需要成功订阅这个topic)
image.png

/a16*pRCl/${TargetDevice}/user/get

(3)启动规则
image.png

2、设备端开发
源设备A:上报消息(消息中一定要包含TargetDevice字段)
这里体现TargetDevice必须要有,你们自己测试的时候可以再加些其他字段
image.png

源码:
AliyunIoTSignUtil:

package com.alibaba.taro;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**
 * AliyunIoTSignUtil
 */

public class AliyunIoTSignUtil {
    public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
        //将参数Key按字典顺序排序
        String[] sortedKeys = params.keySet().toArray(new String[] {});
        Arrays.sort(sortedKeys);

        //生成规范化请求字符串
        StringBuilder canonicalizedQueryString = new StringBuilder();
        for (String key : sortedKeys) {
            if ("sign".equalsIgnoreCase(key)) {
                continue;
            }
            canonicalizedQueryString.append(key).append(params.get(key));
        }

        try {
            String key = deviceSecret;
            return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * HMACSHA1加密
     *
     */
    public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
        SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
        Mac mac = Mac.getInstance(secretKey.getAlgorithm());
        mac.init(secretKey);
        byte[] data = mac.doFinal(content.getBytes("utf-8"));
        return bytesToHexString(data);
    }

    public static final String bytesToHexString(byte[] bArray) {

        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }
}

设备A:

package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo {

    public static String productKey = "a16***pRCl";
    public static String deviceName = "IoTDeviceDemo1";
    public static String deviceSecret = "a3b15a11*****c4952";
    public static String regionId = "cn-shanghai";

    // 自定义topic,在产品Topic列表位置定义
    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/update";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/update";

    private static MqttClient mqttClient;
    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
        // 汇报属性
        String payloadJson = "{\"TargetDevice\":\"IoTDeviceDemo2\"}";
        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //打印输出消息payLoad             
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            //String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:443";
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值:");
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服务返回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("服务调用返回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}



目的设备B:
订阅规则引擎配置的topic
/a16*pRCl/${TargetDevice}/user/get

image.png

源码:
B设备


package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo2 {

    public static String productKey = "a1*****pRCl";
    public static String deviceName = "IoTDeviceDemo2";
    public static String deviceSecret = "0895205*****7e2b4bf2";
    public static String regionId = "cn-shanghai";

    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
       
         String payloadJson = "{\"tts\":\"ss\"}";

        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //打印输出消息payLoad           
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值:");
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服务返回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("服务调用返回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}


测试结果

1、运行B程序、再运行A程序
A程序上报消息
image.png

B程序收到云平台转发消息
image.png

2、控制台日志
2020/12/20 14:16:28.304
设备A上报消息:
image.png

2020/12/20 14:16:28.334
平台触发规则引擎转发消息
image.png

2020/12/20 14:16:28.336
设备B收到转发的消息
image.png

总览:
image.png

相关实践学习
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
8月前
|
消息中间件 安全 物联网
海量接入、毫秒响应:易易互联携手阿里云构筑高可用物联网消息中枢
面对换电生态高速发展的通信挑战,易易互联通过采用阿里云 MQTT + RocketMQ 的融合解决方案,成功构建了“海量接入、实时响应、弹性处理、安全可信”的物联网通信底座。该架构不仅显著提升了系统稳定性与可扩展性,更保障了高并发场景下的业务连续性,为实现“让换电成为营运补能第一选择”的战略目标提供了坚实的技术支撑。
413 60
|
12月前
|
物联网
(手把手)在华为云、阿里云搭建自己的物联网MQTT消息服务器,免费IOT平台
本文介绍如何在阿里云搭建自己的物联网MQTT消息服务器,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
3716 42
|
存储 人工智能 监控
星云智控科技-优雅草星云物联网AI智控系统软件产品技术栈一览表-优雅草卓伊凡
星云智控科技-优雅草星云物联网AI智控系统软件产品技术栈一览表-优雅草卓伊凡
389 7
星云智控科技-优雅草星云物联网AI智控系统软件产品技术栈一览表-优雅草卓伊凡
|
传感器 物联网 开发者
FreeMQTT & FreeMQTT plus:物联网通信的强大助力
FreeMQTT 和 FreeMQTT plus 是基于 MQTT 协议的物联网通信解决方案。FreeMQTT 是用 Python 实现的开源 MQTT Server,支持多协议传输、应用分组隔离,易于安装和跨平台运行。FreeMQTT plus 则是分布式集群架构的新型 Broker,具备高可用性、会话同步优化、灵活扩展能力及高效消息路由特性。二者适用于智能家居、工业物联网和智能交通等领域,为开发者提供轻量级、高性能的通信工具,助力构建稳定可靠的物联网系统。
|
存储 监控 安全
工业物联网关应用:PLC数据通过智能网关上传阿里云实战
本文介绍如何使用智能网关将工厂PLC数据传输至阿里云平台,适合中小企业远程监控设备状态。硬件准备包括三菱FX3U PLC、4G智能网关和24V电源。接线步骤涵盖PLC编程口与网关连接、运行状态检测及天线电源接入。配置过程涉及通讯参数、阿里云对接和数据点映射。PLC程序关键点包括数据上传触发和温度值处理。阿里云平台操作包含实时数据查看、数据可视化和规则引擎设置。最后提供常见故障排查表和安全建议,确保系统稳定运行。
1414 1
|
物联网 数据挖掘 BI
基于阿里云物联网平台(IoT)的智能家居系统开发与部署
随着物联网技术的发展,智能家居成为提升生活品质的重要方向。阿里云物联网平台提供设备接入、数据管理及应用开发能力,支持亿级设备接入、高效数据管理和灵活应用开发,确保系统安全。本文通过实战案例展示如何基于该平台构建智能家居系统,涵盖设备接入、远程控制、场景联动与数据分析等功能,助力企业快速部署智能家居解决方案。
|
存储 人工智能 弹性计算
国内首个,阿里云入选Gartner®战略云平台魔力象限挑战者
近日,Gartner发布2024年《战略云平台魔力象限》(Magic Quadrant™ for Strategic Cloud Platform Services)报告,阿里云从利基者象限进入挑战者象限,成为国内首个入选该象限的中国公共云厂商。
|
传感器 人工智能 物联网
健康监测设备的技术革命:AI+物联网如何让你随时掌握健康数据?
健康监测设备的技术革命:AI+物联网如何让你随时掌握健康数据?
1483 19
|
存储 安全 物联网
政府在推动物联网技术标准和规范的统一方面可以发挥哪些作用?
政府在推动物联网技术标准和规范的统一方面可以发挥哪些作用?
537 60
|
安全 物联网 物联网安全
制定统一的物联网技术标准和规范的难点有哪些?
制定统一的物联网技术标准和规范的难点有哪些?
619 58

相关产品

  • 物联网平台