阿里云物联网平台设备历史属性上报及MNS服务端订阅

简介: 阿里云物联网平台不仅支持设备即时属性的上报,也支持因为某些原因,没有及时上报的属性数据,通过历史属性上报的方式进行上报,历史属性上报Topic:/sys/{productKey}/{deviceName}/thing/event/property/history/post。

概述

阿里云物联网平台不仅支持设备即时属性的上报,也支持因为某些原因,没有及时上报的属性数据,通过历史属性上报的方式进行上报,历史属性上报Topic:/sys/{productKey}/{deviceName}/thing/event/property/history/post。本文结合物联网平台最新推出的独享实例,在新的实例下面创建产品及设备,进行历史属性的上报测试,并进行 MNS 历史属性服务端订阅。

操作步骤

1、创建独享实例,在独享实例下面创建产品和设备

_
_
_

2、获取独享实例设备端MQTT接入点

_

3、设备端接入,参考链接

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTDemoPubSubDemo {

    public static String productKey = "g028S******";
    public static String deviceName = "device1";
    public static String deviceSecret = "aKEHNK1w0UBvGcA1BA7gf2eHC********";
    public static String regionId = "cn-shanghai";
    public static String isntanceId = "iot-instc-public-cn-0pp1g*******";

    // 物模型 - 历史属性上报topic
    private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/history/post";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
        postDeviceProperties();
    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // MQTT 设备接入 公网终端节点(Endpoint)
            String targetServer = "tcp://" + isntanceId + ".iot-as-mqtt."+regionId+".iothub.aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
//        connOpts.setCleanSession(true);
        connOpts.setCleanSession(false);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties() {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("历史上报属性值");
            String payloadJson = "{ \"id\": 123, \"version\": \"1.0\", \"method\": \"thing.event.property.history.post\", \"params\": [ { \"identity\": { \"productKey\": \"g028S******\", \"deviceName\": \"device1\" }, \"properties\": [ { \"AreaId\": { \"value\": \"history data test 4\", \"time\": 1578198990000 } } ] } ] }";
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(1);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

payLoad格式参考

4、设备运行状态查看

_

5、MNS服务端订阅配置

_

6、使用 MNS Python SDK 获取MNS订阅到Queue中的消息

  • 6.1 安装SDK

_

  • 6.2 Code Sample
#init my_account, my_queue
from mns.account import Account
import sys
import json
import base64

endpoint = "http://18482178********.mns.cn-shanghai.aliyuncs.com/"
accid = "LTAIOZZg********"
acckey = "v7CjUJCMk7j9aKduMAQLjy********"
token = ""
my_account = Account(endpoint, accid, acckey, token)
queue_name = "aliyun-iot-g028S******"
my_queue = my_account.get_queue(queue_name)

#循环读取删除消息直到队列空
#receive message请求使用long polling方式,通过wait_seconds指定长轮询时间为3秒

## long polling 解析:
### 当队列中有消息时,请求立即返回;
### 当队列中没有消息时,请求在MNS服务器端挂3秒钟,在这期间,有消息写入队列,请求会立即返回消息,3秒后,请求返回队列没有消息;

wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (10*"=", 10*"=", queue_name, wait_seconds))
while True:
    #读取消息
    try:
        recv_msg = my_queue.receive_message(wait_seconds)
        print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))
        print("Message Body: ",base64.b64decode(json.loads(recv_msg.message_body)['payload'])) # 转发到mns 的messagebody经过了base64编码,获取具体消息的内容需要做base64解码
    except Exception as e:
    #except MNSServerException as e:
        if e.type == u"QueueNotExist":
            print("Queue not exist, please create queue before receive message.")
            sys.exit(0)
        elif e.type == u"MessageNotExist":
            print("Queue is empty!")
            sys.exit(0)
        print("Receive Message Fail! Exception:%s\n" % e)
        continue

    #删除消息
    try:
        my_queue.delete_message(recv_msg.receipt_handle)
        print("Delete Message Succeed!  ReceiptHandle:%s" % recv_msg.receipt_handle)
    except Exception as e:
        print("Delete Message Fail! Exception:%s\n" % e)
print("Delete Message Fail! Exception:%s\n" % e)
  • 6.3 Test Result
Receive Message Succeed! ReceiptHandle:8-2zuDHj20LzZz8zcFz0z6YEzcSFqxyIKefW MessageBody:{"payload":"eyJkZXZpY2VUeXBlIjoiQ3VzdG9tQ2F0ZWdvcnkiLCJpb3RJZCI6IlY1WGJXekluY0EyOW41aWNib3RYZzAyODAwIiwicmVxdWVzdElkIjoiMTIzIiwicHJvZHVjdEtleSI6ImcwMjhTR1o4RlJDIiwiZ210Q3JlYXRlIjoxNTc4MjkxNzI0NjY4LCJkZXZpY2VOYW1lIjoiZGV2aWNlMSIsIml0ZW1zIjp7IkFyZWFJZCI6eyJ0aW1lIjoxNTc4MTk4OTkwMDAwLCJ2YWx1ZSI6Imhpc3RvcnkgZGF0YSB0ZXN0*********","messagetype":"thing_history","topic":"/g028S******/device1/thing/event/property/history/post","messageid":1214069604465248256,"timestamp":1578291724} MessageID:5F8092E53A67656C7F811CD50E19A150
Message Body:  b'{"deviceType":"CustomCategory","iotId":"V5XbWzIncA29n5icbo********","requestId":"123","productKey":"g028S******","gmtCreate":1578291724668,"deviceName":"device1","items":{"AreaId":{"time":1578198990000,"value":"history data test 4"}}}'
DEBUG: v7CjUJCMk7j9aKduMAQLjyCmb8cmCm hAbKzezvRlqeVXC18CzChL4OZZk= DELETE

更多参考

使用MNS服务端订阅
Python SDK 队列使用手册

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 SQL JSON
阿里云物联网平台 “物模型属性” 的分析&&易错点&&上报属性时Payload如何正确组装?
您是否在纠结设备上报了数据,平台到底有没有收到? 您是否很疑惑物模型属性怎么老是不刷新? 您是否不理解物模型属性下发总是不生效? 您是否不知道上报属性时Payload到底该怎么填? 您是否很纳闷物模型属性一会又携带有时间戳,一会又没有? 您是否怀疑能不能自定义物模型属性的时间戳?又如何取到自定义时间戳? 您是否...
8951 3
阿里云物联网平台 “物模型属性” 的分析&&易错点&&上报属性时Payload如何正确组装?
|
存储 缓存 安全
oss跨域资源共享(CORS Configuration)
oss跨域资源共享(CORS Configuration)
1607 4
|
弹性计算 Linux 开发工具
幻兽帕鲁服务器如何设置/修改密码
介绍了如何设置幻兽帕鲁服务器的密码,以及需要密码才可以加入到服务器中教程
14627 6
幻兽帕鲁服务器如何设置/修改密码
|
物联网
阿里云物联网平台一型一密获取:DeviceSecret 示例
一型一密安全认证方式下,同一产品下所有设备可以烧录相同固件(即烧录ProductKey和ProductSecret)。设备发送激活请求时,物联网平台进行身份确认,认证通过,下发该设备对应的DeviceSecret。本文主要演示如何使用JAVA SDK动态获取DeviceSecret。
9100 0
|
2月前
|
存储 人工智能 机器人
告别 “缸中之脑”:为何 Agent Runtime 至关重要?MuleRun 如何实现突破?
TL;DR:很多 AI Agent 被困在受限且一刀切的沙箱内,而 MuleRun 是全球首个通过提供可完全自定义且持久化的 Agent Runtime 来解决这一问题的平台——即你可以定义操作系统、访问原生软件、跨会话保留状态并分配硬件资源。这让你能打造真正的“数字化工人”,而不仅仅是受限的聊天机器人。
444 9
|
3月前
|
机器学习/深度学习 人工智能 文字识别
基于YOLOv8的文档图像表格检测与识别系统设计与实现(源码打包)
相比传统图像处理方法,YOLOv8 在表格检测任务中展现出更强的鲁棒性与泛化能力,尤其在复杂背景、扫描文档、低分辨率场景下依然保持高精度表现。同时,项目提供完整的训练流程与标注数据集,便于用户根据具体业务场景进行迁移学习与模型微调。
基于YOLOv8的文档图像表格检测与识别系统设计与实现(源码打包)
|
3月前
|
传感器 人工智能 安全
智能电池充电:使用PID控制器优化SOC(Matlab代码实现)
智能电池充电:使用PID控制器优化SOC(Matlab代码实现)
129 0
|
2月前
|
人工智能
阿里云产品八月刊来啦
通义万相开源视频模型Wan2.2-S2V,表格存储全面升级支持AI场景能力,通用算力型实例 u2i 开启公测,更多精彩请点击。
164 0
|
9月前
|
机器学习/深度学习 人工智能 自然语言处理
基于星海智算云平台部署 DeepSeek-R1系列 70b 模型全攻略(附平台福利)
本文介绍了如何在星海智算云平台上部署DeepSeek-R1系列70B模型,解决官网访问不畅的问题。通过云端部署,用户可以按需付费,避免本地部署高昂成本(高达两百多万)。文章详细讲解了从实例创建到开始使用DeepSeek的八个步骤,并提供了成本优化技巧和新手注意事项。推荐使用双A100显卡,每小时费用仅13.32元。新用户还可领取福利,享受高性价比服务。立即注册体验:[星海智算云平台](https://gpu.spacehpc.com/user/register?inviteCode=52872508)。
789 1
基于星海智算云平台部署 DeepSeek-R1系列 70b 模型全攻略(附平台福利)
|
缓存 关系型数据库 MySQL
error: Failed dependencies: mariadb-connector-c-config is obsoleted by mysql-community-server-8.0.36-1.el7.x86_64 问题解决
error: Failed dependencies: mariadb-connector-c-config is obsoleted by mysql-community-server-8.0.36-1.el7.x86_64 问题解决
942 19

相关产品

  • 物联网平台