阿里云物联网平台自定义Topic脚本解析功能演示

简介: 之前物联网平台自定义Topic均做消息的直接透传,不做类似物模型模式的数据脚本解析。平台最新推出的自定义Topic脚本解析功能,设备通过携带解析标记(?_sn=default)的自定义Topic上报数据,物联网平台收到数据后,调用您在控制台提交的数据解析脚本,将自定义格式数据转换为JSON结构体,再流转给后续业务系统。

概述

之前物联网平台自定义Topic均做消息的直接透传,不做类似物模型模式的数据脚本解析。平台最新推出的自定义Topic脚本解析功能,设备通过携带解析标记(?_sn=default)的自定义Topic上报数据,物联网平台收到数据后,调用您在控制台提交的数据解析脚本,将自定义格式数据转换为JSON结构体,再流转给后续业务系统。本文主要演示该功能的具体功能实现。

Step By Step

1、创建产品和设备

_

_

2、添加脚本

_

示例脚本

3、设备端通过自定义Topic模拟上行数据

import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

// 透传类设备测试
public class IoTDemoPubSubDemoForPersonalTopic {

    // 设备三元组信息
    public static String productKey = "a1wJG******";
    public static String deviceName = "Device1";
    public static String deviceSecret = "40YEyiGzXmvhDdpvbUVFCHjC********";
    public static String regionId = "cn-shanghai";

    private static String pubTopic = "/"+ productKey + "/" + deviceName  + "/user/update?_sn=default";//自定义Topic做脚本解析
    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient(); // 初始化Client
        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());

        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties() {

        try {
            //上报数据
            System.out.println("自定义Topic上报属性值");
            //0x000000000100320100000000
            String hexString = "000000000100320100000000";
            byte[] payLoad = hexToByteArray(hexString);
            MqttMessage message = new MqttMessage(payLoad);
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * hex字符串转byte数组
     * @param inHex 待转换的Hex字符串
     * @return  转换后的byte数组结果
     */
    public static byte[] hexToByteArray(String inHex){
        int hexlen = inHex.length();
        byte[] result;
        if (hexlen % 2 == 1){
            //奇数
            hexlen++;
            result = new byte[(hexlen/2)];
            inHex="0"+inHex;
        }else {
            //偶数
            result = new byte[(hexlen/2)];
        }
        int j=0;
        for (int i = 0; i < hexlen; i+=2){
            result[j]=hexToByte(inHex.substring(i,i+2));
            j++;
        }
        return result;
    }

    /**
     * Hex字符串转byte
     * @param inHex 待转换的Hex字符串
     * @return  转换后的byte
     */
    public static byte hexToByte(String inHex) {
        return (byte) Integer.parseInt(inHex, 16);
    }
}

参考链接:LoRaWAN设备数据解析及开源MQTT SDK设备端模拟

4、上行消息查看

_

_

5、注意事项

1、在物联网平台创建自定义Topic时按正常Topic定义,不添加该解析标记;
2、仅解析设备上报云端的数据,不解析云端下行数据;
3、仅解析上报数据的Payload,并返回解析后的Payload;
4、解析前后,数据所在Topic不变。例如,设备发送到/${productKey}/${deviceName}/user/update的数据,解析后仍在该Topic中。

参考链接

自定义数据解析Topic概述

LoRaWAN设备数据解析及开源MQTT SDK设备端模拟

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
1月前
|
机器学习/深度学习 编解码 人工智能
阿里云gpu云服务器租用价格:最新收费标准与活动价格及热门实例解析
随着人工智能、大数据和深度学习等领域的快速发展,GPU服务器的需求日益增长。阿里云的GPU服务器凭借强大的计算能力和灵活的资源配置,成为众多用户的首选。很多用户比较关心gpu云服务器的收费标准与活动价格情况,目前计算型gn6v实例云服务器一周价格为2138.27元/1周起,月付价格为3830.00元/1个月起;计算型gn7i实例云服务器一周价格为1793.30元/1周起,月付价格为3213.99元/1个月起;计算型 gn6i实例云服务器一周价格为942.11元/1周起,月付价格为1694.00元/1个月起。本文为大家整理汇总了gpu云服务器的最新收费标准与活动价格情况,以供参考。
阿里云gpu云服务器租用价格:最新收费标准与活动价格及热门实例解析
|
15天前
|
机器学习/深度学习 Java API
阿里云文档智能解析——大模型版能力最佳实践与体验评测
阿里云文档智能解析(大模型版)在处理非结构化数据方面表现优异,尤其是在性能和可扩展性上具有明显优势。虽然存在一些待完善之处,但其强大的基础能力和广泛的适用场景使其成为企业数字转型过程中的有力助手。随着技术的不断进步和完善,相信它会在更多领域展现出更大的价值。
51 5
阿里云文档智能解析——大模型版能力最佳实践与体验评测
|
5天前
|
文字识别 算法 API
阿里云文档解析(大模型版)优化
阿里云文档解析(大模型版
|
1月前
|
弹性计算 负载均衡 数据库
阿里云轻量应用服务器全面解析:收费标准、产品优势及适用场景
在云计算领域,阿里云凭借其强大的技术实力和丰富的产品线,为用户提供了一系列高效、便捷的云服务器产品。其中,轻量应用服务器(Simple Application Server)作为面向个人开发者、中小企业等用户的入门级云产品,凭借其易用性、高性价比以及一站式服务体验,受到了广泛的欢迎。本文将全面解析阿里云轻量应用服务器的收费标准、产品优势以及适用场景,帮助用户更好地了解和选择这一产品。
阿里云轻量应用服务器全面解析:收费标准、产品优势及适用场景
|
1月前
|
弹性计算 负载均衡 数据库
阿里云轻量应用服务器收费标准、性能及适用场景全面解析
阿里云轻量应用服务器(Simple Application Server)作为面向个人开发者、中小企业等用户的入门级云产品,凭借其易用性、高性价比以及一站式服务体验,受到了广泛的欢迎。本文将全面解析阿里云轻量应用服务器的收费标准、最新活动价格以及适用场景,帮助用户更好地了解和选择这一产品。
阿里云轻量应用服务器收费标准、性能及适用场景全面解析
|
1月前
|
人工智能 智能设计 数据挖掘
阿里云高校计划价值与意义解析
阿里云推出了“阿里云高校计划”,旨在通过提供普惠算力和丰富的云产品,助力高校科研与教育加速,让每位在校大学生都能真实受益于这一技术变革。本文将深入探讨阿里云高校计划的详细内容及其对高校学子的深远影响。
阿里云高校计划价值与意义解析
|
1月前
|
机器学习/深度学习 弹性计算 人工智能
阿里云第八代云服务器ECSg8i实例深度解析:性能及适用场景参考
目前企业对云服务器的性能、安全性和AI能力的要求日益提高。阿里云推出的第八代云服务器ECS g8i实例,以其卓越的性能、增强的AI能力和全面的安全防护,除了适用于通用互联网应用和在线音视频应用等场景之外,也广泛应用于AI相关应用。本文将深入解析ECS g8i实例的技术特性、产品优势、适用场景及与同类产品的对比,以供参考。
阿里云第八代云服务器ECSg8i实例深度解析:性能及适用场景参考
|
25天前
|
弹性计算 开发框架 数据可视化
阿里云虚拟主机和云服务器有什么区别?多角度全解析对比
阿里云虚拟主机与云服务器ECS的主要区别在于权限与灵活性。虚拟主机简化了网站搭建流程,预装常用环境,适合初级用户快速建站;而云服务器提供全面控制权,支持多样化的应用场景,如APP后端、大数据处理等,更适合具备技术能力的用户。尽管虚拟主机在价格上通常更优惠,但随着云服务器价格的下降,其性价比已超越虚拟主机,成为更具吸引力的选择。
|
27天前
|
设计模式 存储 人工智能
深度解析Unity游戏开发:从零构建可扩展与可维护的游戏架构,让你的游戏项目在模块化设计、脚本对象运用及状态模式处理中焕发新生,实现高效迭代与团队协作的完美平衡之路
【9月更文挑战第1天】游戏开发中的架构设计是项目成功的关键。良好的架构能提升开发效率并确保项目的长期可维护性和可扩展性。在使用Unity引擎时,合理的架构尤为重要。本文探讨了如何在Unity中实现可扩展且易维护的游戏架构,包括模块化设计、使用脚本对象管理数据、应用设计模式(如状态模式)及采用MVC/MVVM架构模式。通过这些方法,可以显著提高开发效率和游戏质量。例如,模块化设计将游戏拆分为独立模块。
67 3
|
1月前
|
域名解析 网络协议 API
【API管理 APIM】APIM集成内部VNet时,常遇见的关于自定义DNS服务问题。
【API管理 APIM】APIM集成内部VNet时,常遇见的关于自定义DNS服务问题。

相关产品

  • 物联网平台
  • 推荐镜像

    更多