阿里云物联网平台自定义Topic脚本解析功能演示

简介: 之前物联网平台自定义Topic均做消息的直接透传,不做类似物模型模式的数据脚本解析。平台最新推出的自定义Topic脚本解析功能,设备通过携带解析标记(?_sn=default)的自定义Topic上报数据,物联网平台收到数据后,调用您在控制台提交的数据解析脚本,将自定义格式数据转换为JSON结构体,再流转给后续业务系统。

概述

之前物联网平台自定义Topic均做消息的直接透传,不做类似物模型模式的数据脚本解析。平台最新推出的自定义Topic脚本解析功能,设备通过携带解析标记(?_sn=default)的自定义Topic上报数据,物联网平台收到数据后,调用您在控制台提交的数据解析脚本,将自定义格式数据转换为JSON结构体,再流转给后续业务系统。本文主要演示该功能的具体功能实现。

Step By Step

1、创建产品和设备

_

_

2、添加脚本

_

示例脚本

3、设备端通过自定义Topic模拟上行数据

import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

// 透传类设备测试
public class IoTDemoPubSubDemoForPersonalTopic {

    // 设备三元组信息
    public static String productKey = "a1wJG******";
    public static String deviceName = "Device1";
    public static String deviceSecret = "40YEyiGzXmvhDdpvbUVFCHjC********";
    public static String regionId = "cn-shanghai";

    private static String pubTopic = "/"+ productKey + "/" + deviceName  + "/user/update?_sn=default";//自定义Topic做脚本解析
    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient(); // 初始化Client
        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());

        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties() {

        try {
            //上报数据
            System.out.println("自定义Topic上报属性值");
            //0x000000000100320100000000
            String hexString = "000000000100320100000000";
            byte[] payLoad = hexToByteArray(hexString);
            MqttMessage message = new MqttMessage(payLoad);
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * hex字符串转byte数组
     * @param inHex 待转换的Hex字符串
     * @return  转换后的byte数组结果
     */
    public static byte[] hexToByteArray(String inHex){
        int hexlen = inHex.length();
        byte[] result;
        if (hexlen % 2 == 1){
            //奇数
            hexlen++;
            result = new byte[(hexlen/2)];
            inHex="0"+inHex;
        }else {
            //偶数
            result = new byte[(hexlen/2)];
        }
        int j=0;
        for (int i = 0; i < hexlen; i+=2){
            result[j]=hexToByte(inHex.substring(i,i+2));
            j++;
        }
        return result;
    }

    /**
     * Hex字符串转byte
     * @param inHex 待转换的Hex字符串
     * @return  转换后的byte
     */
    public static byte hexToByte(String inHex) {
        return (byte) Integer.parseInt(inHex, 16);
    }
}

参考链接:LoRaWAN设备数据解析及开源MQTT SDK设备端模拟

4、上行消息查看

_

_

5、注意事项

1、在物联网平台创建自定义Topic时按正常Topic定义,不添加该解析标记;
2、仅解析设备上报云端的数据,不解析云端下行数据;
3、仅解析上报数据的Payload,并返回解析后的Payload;
4、解析前后,数据所在Topic不变。例如,设备发送到/${productKey}/${deviceName}/user/update的数据,解析后仍在该Topic中。

参考链接

自定义数据解析Topic概述

LoRaWAN设备数据解析及开源MQTT SDK设备端模拟

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
10天前
|
弹性计算 缓存 应用服务中间件
阿里云服务器2核2G99元和2核4G199元实例规格性能及适用场景解析
2024年阿里云推出了两款云服务器,2核2G3M带宽40G ESSD Entry盘价格只要99元1年,2核4G5M带宽80G ESSD Entry盘价格只要199元1年,这两款云服务器的活动截止日期为2026年3月31日,活动期间新购、续费同价。那么这两款云服务器怎么样呢?可以用来做什么?本文将对这两款云服务器进行深度解析,包括配置介绍、实例规格、使用场景以及购买建议,以供选择参考。
阿里云服务器2核2G99元和2核4G199元实例规格性能及适用场景解析
|
26天前
|
边缘计算 Cloud Native 数据管理
【阿里云云原生专栏】云原生背景下的AIoT布局:阿里云Link平台解析
【5月更文挑战第29天】阿里云Link平台,作为阿里云在AIoT领域的核心战略,借助云原生技术,为开发者打造一站式物联网服务平台。平台支持多协议设备接入与标准化管理,提供高效数据存储、分析及可视化,集成边缘计算实现低延时智能分析。通过实例代码展示,平台简化设备接入,助力智能家居等领域的创新应用,赋能开发者构建智能生态系统。
123 3
|
13天前
|
存储 机器学习/深度学习 编解码
深度解析阿里云服务器计算型c7与计算型c8y实例区别与选择参考
在阿里云提供的众多计算型云服务器实例规格中,计算型c7和计算型c8y实例是两款备受关注的云服务器规格。主要适用于网站应用、批量计算、视频编码等各种类型和规模的企业级应用,对于初次接触阿里云服务器的新手用户来说,可能并不是很清楚他们之间的区别,因此可能不知道怎么选择。本文将从实例的架构、处理器、存储与网络能力、使用场景、指标数据、收费标准以及实时活动价格等多个维度,对计算型c7和计算型c8y实例进行深度解析,以供参考和选择。
深度解析阿里云服务器计算型c7与计算型c8y实例区别与选择参考
|
3天前
|
存储 弹性计算 安全
构建高效企业应用架构:阿里云产品组合实践深度解析
该方案展现了阿里云产品组合的强大能力和灵活性,不仅满足了当前业务需求,也为未来的扩展打下了坚实的基础。希望本文的分享能为读者在设计自己的IT解决方案时提供一定的参考和启发。
21 1
|
1月前
|
域名解析 网络协议 安全
【域名解析DNS专栏】云服务中的DNS解析服务比较:阿里云、AWS、Azure大PK
【5月更文挑战第23天】此对比分析探讨了阿里云DNS、AWS Route 53和Azure DNS的服务特点。阿里云DNS以其智能解析和IPv6支持脱颖而出,适合中国地区用户;AWS Route 53凭借其强大的路由策略和与AWS生态的深度集成吸引高级用户;Azure DNS则以简洁管理和DNSSEC安全支持见长,与Azure平台集成良好。选择取决于具体需求,如功能、易用性、性能、安全性和成本。
【域名解析DNS专栏】云服务中的DNS解析服务比较:阿里云、AWS、Azure大PK
|
14天前
|
运维 网络协议 JavaScript
Serverless 应用引擎产品使用合集之绑定自定义域名是否要确定解析设置
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
1月前
|
存储 弹性计算 监控
【阿里云弹性计算】阿里云ECS全面解析:弹性计算服务的核心优势与应用场景
【5月更文挑战第20天】阿里云ECS是提供可伸缩计算能力的云服务,支持多种规格实例,满足不同需求。其核心优势包括灵活性、高性能、高可用性、安全性和易用性。适用场景包括网站托管、大数据处理、游戏多媒体应用及测试开发环境。通过Python示例代码展示了如何创建ECS实例,助力企业专注业务发展,简化基础设施管理。
76 5
|
1月前
|
存储 弹性计算 Kubernetes
【阿里云云原生专栏】深入解析阿里云Kubernetes服务ACK:企业级容器编排实战
【5月更文挑战第20天】阿里云ACK是高性能的Kubernetes服务,基于开源Kubernetes并融合VPC、SLB等云资源。它提供强大的集群管理、无缝兼容Kubernetes API、弹性伸缩、安全隔离及监控日志功能。用户可通过控制台或kubectl轻松创建和部署应用,如Nginx。此外,ACK支持自动扩缩容、服务发现、负载均衡和持久化存储。多重安全保障和集成监控使其成为企业云原生环境的理想选择。
225 3
|
14天前
|
存储 算法 搜索推荐
深入解析力扣179题:最大数(自定义排序法详解及模拟面试问答)
深入解析力扣179题:最大数(自定义排序法详解及模拟面试问答)

热门文章

最新文章

相关产品

  • 物联网平台
  • 推荐镜像

    更多