物模型接入价值与实践

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 作者:熊益群

物模型价值

物联网元年

关键词:探索、快速

2016年阿里云物联网平台(前称:物联网套件)上线,为客户设备上云提供了通道能力,包括MQTT连接、消息流转等核心功能。
第一批客户大多基于该模式使用物联网平台能力,当时整个行业处于物联网云平台起步期,包括AWS,Azure起步阶段同样只是提供通道能力。
基于通道能力,客户使用物联网平台接入方式详见文档 https://developer.aliyun.com/article/746536
这个阶段的客户大多是硬件厂商,软硬一体开发,尝试物联网转型提升设备价值,对物联网平台的诉求比较简单,希望自己更多参与,对新模式有更多把控力,所以都会采用自定义协议上云。

test

物联网繁荣

关键词:生态、扩展、数字化

近两年物联网设备、解决方案如雨后春笋般涌出,不少用户希望赶上物联网这波浪潮。这个阶段的客户不仅仅关注设备连云,也开始关注围绕设备产生的解决方案。因此客户角色从硬件厂商,快速扩展到集成商、软件提供商等。由于大量角色的进入,对软硬开发解耦、易扩展的能力提出了诉求。同时我们也发现第一批使用通道能力的平台客户随着自己业务发展、设备扩展,原来的架构已无法支撑,对物联网平台也提出了新的要求。

举两个典型场景:

  • 老客户升级:某个共享设备提供商,原来仅提供大学校园共享洗衣机服务,利用物联网平台通道能力上云,随着公司业务发展,从共享洗衣机业务扩展到校园淋浴、饮水机、充电桩等多类设备,原来自定义协议和API无法支撑多品类设备,难扩展。需要有一套接入标准和规范,方便快速扩展设备类型。
  • 新生态客户:某个充电桩平台客户,提供充电桩管理平台,作为甲方要求大量桩企(乙方)按照平台规范接入,典型的软硬件分离场景。需要有一套接入标准和规范,方便快速扩展桩企规模。

这一阶段平台在通道能力之上,提供了物模型能力,物模型可以屏蔽底层设备差异,让软件开发者基于平台提供的标准API开发;硬件开发者基于平台提供的标准协议开发;从而达到软硬开发解耦的目的。

test

物联网赋能

关键词:场景化、智能

物联网终极目标一定是基于设备采集数据赋能业务,实现数字业务化。例如金融、物流、家居、餐饮、商场、医疗、交通等不同领域通过物联网数字化后,结合数据分析智能化决策、互联互通、场景规则、数字孪生等能力实现纵深领域场景化、智能化。
这一阶段平台在通道能力、物模型能力之上,还进一步提供设备智能运维、数据分析、可视化、数字孪生等高价值服务,帮助客户数字化后产生真正的业务价值。

test

基于以上分析,物联网已经过了最初的“元年”阶段,也迈入了“繁荣”阶段,正逐步朝“问物联网要赋能”的阶段演进。物模型是物联网生态化、高扩展、数字化、智能化非常重要的基础,强烈建议客户使用。

物模型接入实践

自定义接入模式

以一个老客户为例,原来仅使用物联网平台通道能力,下图中1~8流程都需要自定义开发,当客户设备类型足够简单时,该模式复杂度通常不会成为客户痛点。

test

面临的挑战

随着客户接入设备种类越来越多,面临的扩展性问题也越来越严峻。

test

使用物模型后的模式

物模型模式下,设备与云交互协议、云平台设备API都基于物模型标准化了,即使设备不断扩展,客户业务服务器和设备端逻辑都不需要进行调整,保证了扩展性。

test

物模型接入流程详细介绍

流程图

以下是客户详细接入流程,主要分为:云端配置、设备开发、服务端开发、设备运行时管理四大部分。平台会提供一些工具,使各部分流程更高效。接下来进行详细介绍。
image.png
本文试图手把手介绍从0到1接入物模型,还会配套介绍一些接入过程中有帮助的平台能力,所以文章篇幅比较长,事实上客户接入流程还是非常简单的,真正开发只需要涉及到图中红色三个模块。

如果您希望快速接入,可以直接关注P0部分,其它部分都可以跳过。

1 云端配置

1.1 创建产品(P0)

1.登录物联网平台
2.创建产品。
image.png
说明
• 所属品类:标准品类库提供了一些供参考的模板,选择后可以修改,建议使用。
• 节点类型:根据实际选择即可。
• 数据格式:“ICA标准数据格式(Alink JSON)”表示设备使用标准Alink JSON格式上报数据;“透传/自定义”表示设备可以使用自定义格式,通过Alink特定Topic上报物联网平台,该模式客户需要写脚本进行转换,透传模式在此不做展开,后面单独起文章介绍。

1.2 物模型建模(P0)

1.模型查看。
已有的模型是继承自创建产品时选择的“充电桩”品类模板。
image.png

2.编辑模型。
通过“编辑草稿”,进行修改和添加,最后需要对物模型“发布上线”。
image.png
说明
• 定义物模型非常重要,物模型通过属性、事件、服务三要素描述了设备所有能力,设备和云交互、客户服务器访问设备通过物模型都可以实现协议标准化。如果客户定义的物模型如果足够通用和专业,阿里可以帮助作为ICA行业标准进行推广。
• 服务的调用方式有:同步调用、异步调用两种模式。客户云端开发调用下行控制API,同步调用和异步调用获取返回结果方式不一样,在后文“3.3”章节详细介绍。

物模型概念介绍
物模型介绍文档请参见这里
了解物模型概念,能够帮助您更好对设备建模。

1.3 物模型配置

当前默认是物模型强校验模式,即设备上报数据在IoT平台会进行物模型数据规范强校验,如果不符合规范会报错。
另外物模型弱校验、免校验、去重等规则也会在近期陆续开放,后期进行文档补充。
配置之后,会在设备运行时生效。

关联阅读:4.2 物模型扩展规则校验。

1.4 注册三元组(P0)

1.注册设备。
image.png
说明
• 添加设备:测试阶段使用较多,单个添加。
• 批量添加:量产阶段使用,有两种模式,“自动生成”表示设备标识符(deviceName)由平台按照一定的规则随机颁发;“批量上传”支持客户自定义设备标识符(deviceName)。
2.查看设备列表。
可以通过“设备列表”、“批次管理”两种方式查看创建的设备列表。
image.png
通过“批次管理”查看这一批次设备详情,并且支持下载三元组列表。
image.png

注意:此处设备标识符(deviceName)非常重要,与productKey, deviceSecret一起称为设备的“三元组”,作为设备的唯一身份,大部分情况需要烧录到设备上。

2 设备开发

2.1 使用设备SDK开发(P0)

设备接入SDK文档请参见这里
image.png

根据需要选择合适的语言版本。C SDK 建议使用“4.x”版本。

本文选择 Java SDK进行演示。
环境准备:https://help.aliyun.com/document_detail/97331.html
物模型开发:https://help.aliyun.com/document_detail/97333.html

1.开发之前需要先准备如下好两份数据:

  • 设备证书信息(productKey、deviceName、deviceSecret)
    image.png
  • 设备物模型
    image.png

为了方便查看物模型详细数据规范,通过导出“物模型TSL”查看详细物模型定义,其中包括物模型属性、事件、服务标识符、参数、数据规范。抽取部分内容,针对以下属性、事件、服务在DEMO中进行开发演示。

    "schema":"https://iotx-tsl.oss-ap-southeast-1.aliyuncs.com/schema.json",
    "profile":{
        "productKey":"a1nhbEV****"
    },
    "properties":[
        {
            "identifier":"acOutMeterIty",
            "name":"交流输出电表底值监测属性",
            "accessMode":"rw",
            "required":false,
            "dataType":{
                "type":"int",
                "specs":{
                    "min":"0",
                    "max":"200",
                    "step":"1"
                }
            }
        }
    ],
    "events":[
        {
            "identifier":"post",
            "name":"post",
            "type":"info",
            "required":true,
            "desc":"属性上报",
            "method":"thing.event.property.post",
            "outputData":[
                {
                    "identifier":"acOutMeterIty",
                    "name":"交流输出电表底值监测属性",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"200",
                            "step":"1"
                        }
                    }
                }
            ]
        },
        {
            "identifier":"startChaResEvt",
            "name":"启动充电结果事件",
            "type":"info",
            "required":false,
            "method":"thing.event.startChaResEvt.post",
            "outputData":[
                {
                    "identifier":"gunNum",
                    "name":"充电枪编号",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"100",
                            "step":"2"
                        }
                    }
                }
            ]
        }
    ],
    "services":[
        {
            "identifier":"set",
            "name":"set",
            "required":true,
            "callType":"async",
            "desc":"属性设置",
            "method":"thing.service.property.set",
            "inputData":[
                {
                    "identifier":"acOutMeterIty",
                    "name":"交流输出电表底值监测属性",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"200",
                            "step":"1"
                        }
                    }
                }
            ],
            "outputData":[

            ]
        },
        {
            "identifier":"get",
            "name":"get",
            "required":true,
            "callType":"async",
            "desc":"属性获取",
            "method":"thing.service.property.get",
            "inputData":[
                "acOutMeterIty"
            ],
            "outputData":[
                {
                    "identifier":"acOutMeterIty",
                    "name":"交流输出电表底值监测属性",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"200",
                            "step":"1"
                        }
                    }
                }
            ]
        },
        {
            "identifier":"startChaResService",
            "name":"开启充电",
            "required":false,
            "callType":"async",
            "method":"thing.service.startChaResService",
            "inputData":[
                {
                    "identifier":"charm",
                    "name":"电量",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"1",
                            "max":"100",
                            "step":"2"
                        }
                    }
                }
            ],
            "outputData":[
                {
                    "identifier":"realcharm",
                    "name":"realcharm",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"100",
                            "step":"2"
                        }
                    }
                }
            ]
        }
    ]
}

2.开发代码。
如下示例中只需要将三元组,和属性、事件、服务参数替换成您的设备信息。其它代码可以直接运行。

关于免订阅能力介绍:

有些设备最资源比较敏感,为了避免初始化订阅大量Alink协议中系统Topic带来的性能开销,平台提供了免订阅能力,即平台帮设备进行Topic订阅。
SDK只有3.1.0及以后版本支持免订阅能力,并且默认打开该能力。
如果3.1.0及以后版本SDK您希望取消免订阅,依旧按需订阅Topic,可以设置SDK配置项关闭该能力,在make.settings中设置“FEATURE_MQTT_AUTO_SUBSCRIBE=n”。

public class Demo {

    public static void main(String[] args) throws Exception {

        String pk = "a1nhbEVCP**";
        String dn = "7mBP6Dd6IT27Rt***";
        String ds = "*****";

        /**
         * 连接 & 认证
         */
        LinkKitInitParams params = new LinkKitInitParams();

        // 设置 Mqtt 初始化参数
        IoTMqttClientConfig config = new IoTMqttClientConfig();
        config.productKey = pk;
        config.deviceName = dn;
        config.deviceSecret = ds;
        config.receiveOfflineMsg = false;
        params.mqttClientConfig = config;

        // 设置初始化三元组信息,用户传入
        DeviceInfo deviceInfo = new DeviceInfo();
        deviceInfo.productKey = pk;
        deviceInfo.deviceName = dn;
        deviceInfo.deviceSecret = ds;

        params.deviceInfo = deviceInfo;

        LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
            public void onError(AError aError) {
                System.out.println("===============FAILURE===============");
                ALog.e(TAG, "Init Error error=" + aError);
                System.out.println("===============FAILURE===============");
            }

            public void onInitDone(InitResult initResult) {
                System.out.println("===============SUCCESS===============");
                ALog.i(TAG, "onInitDone result=" + initResult);
                System.out.println("===============SUCCESS===============");
            }

        });

        //此处sleep 5S,由于上面init是异步流程
        Thread.sleep(5000);

        /**
         * 物模型开发
         */

        /**
         * 上报属性
         */
        Map<String, ValueWrapper> properties = new HashMap<>();

        // key为物模型中属性标识符"acOutMeterIty",value需要遵循属性值规范:int类型,取值范围在0~200之间;
        properties.put("acOutMeterIty", new ValueWrapper(10));

        LinkKit.getInstance().getDeviceThing().thingPropertyPost(properties, new IPublishResourceListener() {

            @Override
            public void onSuccess(String s, Object o) {
                System.out.println("=====thingPropertyPost success=======");
                System.out.println(s);
                System.out.println(JSON.toJSONString(o));
            }

            @Override
            public void onError(String s, AError aError) {
                System.out.println("=====thingPropertyPost failure=======");
            }
        });

        // 上报属性之后,云端会返回响应结果,此处是监听云端返回的属性reply
        LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {

            @Override
            public void onNotify(String s, String s1, AMessage aMessage) {
                System.out.println("===PROPERTY REPLY===");
                System.out.println("TOPIC:" + s1);
                System.out.println("Payload:" + JSON.toJSONString(aMessage));
            }

            @Override
            public boolean shouldHandle(String s, String s1) {
                return false;
            }

            @Override
            public void onConnectStateChange(String s, ConnectState connectState) {
            }
        });

        /**
         * 上报事件
         */
        HashMap<String, ValueWrapper> eventMap = new HashMap<>();

        // key为物模型中事件参数的标识符"gunNum", value为事件参数值需要遵循数值规范:int类型,取值范围0~100之间;
        eventMap.put("gunNum", new ValueWrapper.IntValueWrapper(50));

        OutputParams eventOutput = new OutputParams(eventMap);

        // 参数identity为"startChaResEvt"属于物模型事件标识符。
        LinkKit.getInstance().getDeviceThing().thingEventPost("startChaResEvt", eventOutput, new IPublishResourceListener() {
            public void onSuccess(String resId, Object o) {
                System.out.println("=====thingEventPost success=======");
                System.out.println(resId);
                System.out.println(JSON.toJSONString(o));
            }

            public void onError(String resId, AError aError) {
                System.out.println("=====thingEventPost failure=======");
            }
        });

        /**
         * 监听并执行下行服务
         */
        // 获取设备支持的所有服务
        LinkKit.getInstance().getDeviceThing().getServices();

        // 用户可以根据实际情况注册自己需要的服务的监听器
        List<Service> srviceList = LinkKit.getInstance().getDeviceThing().getServices();

        for (int i = 0; srviceList != null && i < srviceList.size(); i++) {
            Service service = srviceList.get(i);

            LinkKit.getInstance().getDeviceThing().setServiceHandler(service.getIdentifier(), new ITResRequestHandler() {

                public void onProcess(String identify, Object result, ITResResponseCallback itResResponseCallback) {

                    System.out.println("onProcess() called with: s = [" + identify + "], o = [" + result + "], itResResponseCallback = [" + itResResponseCallback + "]");
                    System.out.println("收到云端异步服务调用 " + identify);
                    try {
                        /**
                         * 设置属性(property)的模式
                         */
                        // "set"为设置属性默认的标识符
                        if ("set".equals(identify)) {
                            // TODO 用户需要设置真实设备的的属性
                            /**
                             * 向云端同步设置好的属性值
                             */
                            Map<String, ValueWrapper> desiredProperty = (Map<String, ValueWrapper>) ((InputParams) result).getData();

                            LinkKit.getInstance().getDeviceThing().thingPropertyPost(desiredProperty, new IPublishResourceListener() {

                                @Override
                                public void onSuccess(String s, Object o) {
                                    if (result instanceof InputParams) {
                                        Map<String, ValueWrapper> data = (Map<String, ValueWrapper>) ((InputParams) result).getData();
                                        //                        data.get()
                                        ALog.d(TAG, "收到异步下行数据 " + data);
                                        // 响应云端 接收数据成功
                                        itResResponseCallback.onComplete(identify, null, null);
                                    } else {
                                        itResResponseCallback.onComplete(identify, null, null);
                                    }
                                }

                                @Override
                                public void onError(String s, AError aError) {
                                    AError error = new AError();
                                    error.setCode(100);
                                    error.setMsg("setPropertyFailed.");
                                    itResResponseCallback.onComplete(identify, new ErrorInfo(error), null);
                                }
                            });

                            /**
                             * 服务(service)的模式
                             */
                            // "startChaResService"为服务的标识符
                        } else if ("startChaResService".equals(identify)) {

                            Map<String, ValueWrapper> inputParams = (Map<String, ValueWrapper>) ((InputParams) result).getData();
                            // TODO 根据服务入参inputParams执行设备逻辑,比如启动充电
                            // 充电完成后,向云端返回输出参数
                            OutputParams outputParams = new OutputParams();
                            // key为"charm"属于物模型中"startChaResService"服务出参标识符,value为出参值遵循数据规范:int类型,数据范围1~100之间;
                            outputParams.put("charm", new ValueWrapper.IntValueWrapper(20));

                            itResResponseCallback.onComplete(identify, null, outputParams);

                        } else {
                            // 根据不同的服务做不同的处理,跟具体的服务有关系
                            OutputParams outputParams = new OutputParams();
                            // 根据特定服务,按照服务规范返回服务的出参。
                            itResResponseCallback.onComplete(identify, null, outputParams);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        ALog.d(TAG, "云端返回数据格式异常");
                    }
                }
                public void onSuccess(Object o, OutputParams outputParams) {
                    ALog.d(TAG, "onSuccess() called with: o = [" + o + "], outputParams = [" + outputParams + "]");
                    ALog.d(TAG, "注册服务成功");
                }
                public void onFail(Object o, ErrorInfo errorInfo) {
                    ALog.d(TAG, "onFail() called with: o = [" + o + "], errorInfo = [" + errorInfo + "]");
                    ALog.d(TAG, "注册服务失败");
                }
            });
        }
    }
}

说明
• 上报属性成功,云端会返回REPLY,有以下日志说明设备到云,云到设备的链路全部走通。
image.png
• 设备收到属性设置指令,在完成物理设备属性修改后,建议将最新属性同步上报云端。

2.2 不使用SDK开发

1.协议准备。
“2.1 使用设备SDK开发”介绍了使用阿里云提供的SDK进行设备开发,当然您也可以选择不使用SDK,完全基于Alink协议(设备和云交互协议)开发。
Alink协议文档:https://help.aliyun.com/document_detail/90459.html
重点关注物模型协议部分:https://help.aliyun.com/document_detail/89301.html 。里面包含了物模型相关所有Topic介绍(物模型Topic列表在控制台也可以查看,如下图)。

test

文档详细介绍了设备端如何向云端上报“属性”、“事件”,如何订阅云端向下发送的“服务”指令。
Topic和Payload都基于客户定义的物模型进行标准化和规范化,从而使得客户设备与云交互方式不会随着设备类型变化而改变,满足扩展性要求。
image.png

2.环境准备。
根据自己选型选择合适的MQTT客户端,本文选择eclipse paho。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.1</version>//可以选择您需要的版本
</dependency>

3.开发。
物模型复用“2.1 使用设备SDK开发”中“开发前准备”给出的。

关于免订阅能力介绍:

有些设备最资源比较敏感,为了避免初始化订阅大量Alink协议中系统Topic带来的性能开销,平台提供了免订阅能力,即平台帮设备进行Topic订阅。
SDK只有3.1.0及以后版本支持免订阅能力,并且默认打开该能力。
如果不使用SDK开发,可以通过设备端在MQTT的连接报文中的clientId部分, 新增_ss=1表示开启自动订阅, 建连成功后服务端会自动订阅上以下表格中的topic, 若传递 _ss=0 或者不传递该字段, 则不会发生服务端自动订阅动作。

4.上报属性。

String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT2*****";
String deviceSecret = "****";

// MQTT连接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);

client.connect();

String setTopic = "/thing/event/property/post";
String setTopicReply = "/thing/event/property/post_reply";

// 上报属性,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅)
// 此处client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client subscribe
client.sysTopic(setTopicReply).subscribe();

// 封装Alink协议系统参数
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保证设备端一段时间内唯一
payload.put("params", params);
payload.put("method", "thing.event.property.post");

// 组装属性payload
String propKey = "acOutMeterIty";
int statusValue = 30;
Map<String, Object> proValue = new HashMap<>();
proValue.put("value", statusValue);
proValue.put("time", System.currentTimeMillis());
params.put(propKey, proValue);

// 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));

// 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息)
client.sysTopic(setTopicReply).readTopic(10000);

client.disconnect();

日志打印的设备请求和响应。
image.png

5.上报事件。


String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT27*****";
String deviceSecret = "***";

// MQTT连接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);

client.connect();

// topic中为"startChaResEvt"属于物模型事件标识符。
String setTopic = "/thing/event/startChaResEvt/post";
String setTopicReply = "/thing/event/startChaResEvt/post_reply";

// 报事件,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅)
client.sysTopic(setTopicReply).subscribe();

// 封装Alink协议系统参数
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保证设备端一段时间内唯一
payload.put("params", params);
payload.put("method", "thing.event.startChaResEvt.post");

// 组装属性payload
Map<String, Object> dataValue = new HashMap<>();
// key为物模型中事件参数的标识符"gunNum", value为事件参数值需要遵循数值规范:int类型,取值范围0~100之间;
dataValue.put("gunNum", 59);

params.put("value", dataValue);
params.put("time", System.currentTimeMillis());

// 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));

// 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息)
client.sysTopic(setTopicReply).readTopic(10000);

client.disconnect();

6.服务调用。
此处为一段伪代码。可以在MQTT建连的时候通过callback监听云端下发的控制指令或消息。
前提:已经对下行的TOPIC进行订阅过,免订阅能力参考上面介绍。

mqttClient = new MqttClient(url, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(true);
connOpts.setCleanSession(false);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(65);
LogUtil.log(clientId + "进行连接, 目的地: " + url);

// 此处订阅云端下发的消息
mqttClient.setCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable cause) {
        LogUtil.log("connection lost, cause:" + cause);
        cause.printStackTrace();
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        TopicChannel topicChannel = getTopic(topic);
        LogUtil.log("receive message, channel:" + topicChannel
                    + ",topic:" + topic
                    + ", payload:" + new String(message.getPayload(), "UTF-8") + "");
        topicChannel.put(message);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //如果是qos 0消息 token.resp是没有回复的
        LogUtil.log("sent, " + ((token == null || token.getResponse() == null) ? "null"
                                : token.getResponse().getKey()));
    }
});

mqttClient.connect(connOpts);

重点说明
• 所有被订阅的下行Topic都会被监听到。物模型相关的主要包括:属性上报Reply、属性下行设置、服务下行控制。
• 设置设备属性(https://help.aliyun.com/document_detail/89301.html#title-wmh-y2e-18r),默认异步方式返回结果。
• 订阅的Topic为Alink协议标准Topic:“/sys/{productKey}/{deviceName}/thing/service/property/set”
• 服务控制(https://help.aliyun.com/document_detail/89301.html#title-3pt-nfy-jys),同异步方式取决于物模型中service配置的调用模式。
• 服务异步方式订阅的Topic为Alink协议标准Topic:“/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}”
• 服务同步方式订阅的Topic需要遵循RRPC Topic模式:详见文档https://help.aliyun.com/document_detail/90568.html

注意:仅设备侧需要感知RRPC特殊TOPIC,设备上云后,数据流转、开放API面向的还是Alink协议编程。

2.3 在线调试

设备开发后之后,如何快速模拟业务服务器给设备下发指令,调试设备能力?IoT平台提供了“在线调试”的功能,可以模拟设备或模拟应用端到端调试。

test

此处使用“在线调试”里面“调试真实设备”能力。通过控制台下发设备控制指令,分两类:1)属性设置;2)服务调用。

1.服务调用调试。

image.png

image.png
云端下发后,可以到设备端查看控制Log是否打印,以判断指令达到端侧。
从图中可见设备收到startChaResService服务,同时向云端返回了输出参数。

2.属性设置调试。

image.png
说明
• “获取”:暂不支持到设备,只能从云端获取设备最新属性。
• “设置”:指令直接到设备端,设备修改本地属性之后,上报云端最新属性;到设备上的设置指令为"set"。
• “设置期望值”:如果设备在线,会直接下发设备,如果设备离线,指令在云端进行缓存,待上线后下发设备端,下发之后,设备修改本地属性之后,同样上报云端最新属性;到设备上的设置指令同样为"set"。如果您希望使用物模型期望值能力,可点击查看最佳实践。
image.png
云端下发后,可以到设备端查看控制Log是否打印,以判断指令达到端侧。
从图中可见设备收到set指令,返回了服务响应,同时向云端上报了最新属性。

说明:服务结果还可以通过“2.4 查看物模型数据”章节中获取。

2.4 查看物模型数据

DEMO运行之后,可以看到设备已经“在线”状态。
“运行状态”展示设备上报的属性值;
“事件管理”展示设备上报的事件;
“服务调用”展示云端下发设备的控制服务;
image.png

上报属性结构化展示。
image.png

上报事件,包括事件参数展示。
image.png

属性设置、服务调用两类服务的云端下发入参、设备响应出参都有展示,如上证明设备收到云端指令,并且正常返回响应。

2.5 查看日志服务

设备在运行过程,可能会出现一些异常,比如连接失败、认证失败、数据异常等等,为了便于排查,可以查看日志服务。举例设备上报数据可能会不符合物模型规范,比如事件参数"gunNum"对应值的数据范围为0~100之间,而真实上报了50000。日志服务会展示设备错误详情。
image.png

image.png
可以看到日志内容为“{"Reason":"tsl parse: int value is bigger than max 100 -> gunNum"}”,说明gunNum对应值超过物模型规范最大值100的限制。物模型规范详情到“物模型TSL”查看。

image.png
同时可以通过“日志转储”中“日志报表”进一步查看设备大盘,包括设备上下线次数、设备上线IP区域分布、设备消息量、设备消息量Top列表、物模型错误分布、云端API错误分布等多维度指标。
日志服务介绍文档请参见这里

3 服务端开发

设备连接到阿里云IoT平台,设备数据会保存在IoT平台时序数据库。同时IoT平台提供两种方式供客户获取设备数据:方式1)通过服务端订阅或者规则引擎实时流转到客户服务器;2)通过开放API供客户调用获取。

3.1 服务端调用API开发(P0)

1.环境准备。
SDK下载文档:https://help.aliyun.com/document_detail/30581.html
API接口列表:https://help.aliyun.com/document_detail/69579.html
重点关注物模型使用相关API

test

2.以下示例为设置设备属性API,设备异步返回结果,客户需要通过“数据流转”方式获取。

String accessKey = "***";
String accessSecret = "***";
try {
    DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
} catch (Exception e) {
    System.out.println("DefaultProfile exception");
}

IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKey, accessSecret);
DefaultAcsClient defaultAcsClient = new DefaultAcsClient(profile);

SetDevicePropertyRequest setDevicePropertyRequest = new SetDevicePropertyRequest();
// 如果使用实例,此处传入真实实例id;如果公共实例,不需要设置。
//createProductRequest.setIotInstanceId("iothub-test-xxx");
setDevicePropertyRequest.setProductKey(pk);
setDevicePropertyRequest.setDeviceName(dn);

Map<String, Integer> properties = new HashMap<>();
// key为物模型中属性标识符"acOutMeterIty",value需要遵循属性值规范:int类型,取值范围在0~200之间;
properties.put("acOutMeterIty", 98);
setDevicePropertyRequest.setItems(JSON.toJSONString(properties));

SetDevicePropertyResponse response = null;
try {
    response = defaultAcsClient.getAcsResponse(setDevicePropertyRequest);
} catch (Exception e) {
    Log.error("执行失败:e:" + e.getMessage());
}

System.out.println("===============");
System.out.println("setDeviceProperty request : " + JSON.toJSONString(setDevicePropertyRequest));
System.out.println("setDeviceProperty response : " + JSON.toJSONString(response.getData()));
System.out.println("setDeviceProperty requestId : " + response.getRequestId());
System.out.println("===============");

重点说明
image.png
下行控制如果为异步服务,需要通过订阅数据流转获取设备返回结果,订阅方式和数据结构详见“3.2 数据流转”章节介绍。

关联介绍:“3.2.1 服务端订阅”中“重点说明”。

3.2 数据流转

平台提供两种数据流转方式:方式1)服务端订阅;方式2)规则引擎;

3.2.1服务端订阅(P0)

服务端订阅配置
image.png
“推送消息类型”选择“设备上报消息”,包括物模型属性上报、事件上报、设备下行指令结果(包括属性设置响应、服务控制响应)等消息。
消息格式详见文档:https://help.aliyun.com/document_detail/73736.html

test

服务端订阅DEMO
接入说明:https://help.aliyun.com/document_detail/143601.html

/**
 * AMQP服务端订阅
*/
//参数说明,请参见AMQP客户端接入说明文档。
String accessKey = "***";
String accessSecret = "***";
String consumerGroupId = "***";
//iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
String iotInstanceId = "";
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
String clientId = "TESTClientID";

//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "|authMode=aksign"
    + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",iotInstanceId=" + iotInstanceId
            + ",consumerGroupId=" + consumerGroupId
            + "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//接入域名,请参见AMQP客户端接入说明文档。
String connectionUrl = "amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671?amqp.idleTimeout=80000";

Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}

private static MessageListener messageListener = new MessageListener() {
    @Override
    public void onMessage(Message message) {
        try {
            //1.收到消息之后一定要ACK。
            // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
            // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
            // message.acknowledge();
            //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
            // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
            executorService.submit(() -> processMessage(message));
        } catch (Exception e) {
            logger.error("submit task occurs exception ", e);
        }
    }
};

/**
 * 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
    try {
        byte[] body = message.getBody(byte[].class);
        String content = new String(body);
        String topic = message.getStringProperty("topic");
        String messageId = message.getStringProperty("messageId");
        System.out.println("AMQP receive message"
                           + ", topic = " + topic
                           + ", messageId = " + messageId
                           + ", content = " + content);
    } catch (Exception e) {
        logger.error("processMessage occurs error ", e);
    }
}

private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
    /**
     * 连接成功建立。
     */
    @Override
    public void onConnectionEstablished(URI remoteURI) {
        logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
    }

    /**
     * 尝试过最大重试次数之后,最终连接失败。
     */
    @Override
    public void onConnectionFailure(Throwable error) {
        logger.error("onConnectionFailure, {}", error.getMessage());
    }

    /**
      * 连接中断。
      */
    @Override
    public void onConnectionInterrupted(URI remoteURI) {
        logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
    }

    /**
     * 连接中断后又自动重连上。
     */
    @Override
    public void onConnectionRestored(URI remoteURI) {
        logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
    }

    @Override
    public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

    @Override
    public void onSessionClosed(Session session, Throwable cause) {}

    @Override
    public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

    @Override
    public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};

/**
 * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
 */
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
    SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
    Mac mac = Mac.getInstance(signMethod);
    mac.init(signingKey);
    byte[] rawHmac = mac.doFinal(toSignString.getBytes());
    return Base64.encodeBase64String(rawHmac);
}

日志打印出订阅到的流转消息如下,符合预期。
image.png

重点说明
下行控制如果为异步服务,需要通过订阅数据流转获取设备返回结果。订阅Topic为"/sys/{productKey}/{deviceName}/thing/downlink/reply/message",需要根据"requestId"关联请求和响应。

关联介绍:“3.1 服务端调用API开发”中“重点说明”。

image.png

3.2.2 规则引擎数据订阅。

配置SQL
SQL介绍文档这里
image.png

调试SQL
Payload数据格式文档这里
image.png

可以查看“调试结果”。

test

符合配置的SQL结果。

转发数据
可以转发到客户以下多种云产品中,本文选择AMQP作为示例验证。
image.png

image.png
创建完成后,需要到规则列表页“启动”改规则。

订阅数据
服务端订阅代码可以复用上面“3.1”服务端订阅代码。差别就是服务端订阅,订阅的是Topic对应的完整Payload;而规则引擎流转AMQP,在消息流转过程可以对Payload做一些规则过滤或简单计算。
以下日志精简报文是通过规则引擎过滤后获取的数据。
image.png
说明:同一组数据不要同时开通规则引擎和服务端订阅两种订阅模式,避免消息干扰。

4 设备运行时

设备量产之后,到达消费者手上,会开始激活上线进入到设备运行时。由于不属于开发态流程,本章节仅做简单介绍,目的是能让开发者知道开发态的配置在运行态如何产生作用,对设备接上阿里云IoT平台后的流程有个简单的认识。

image.png

本文通过物模型接入流程,介绍了平台设备连接、物模型规范校验、物模型数据、规则引擎、服务端订阅、开放API六大基础能力。
设备全生命周期过程中,还有不少设备管理能力供客户选择,其中包括设备标签、设备分组、设备检索、OTA、设备运维、设备分发、文件上传、远程配置等,欢迎使用。

4.1 连接

设备连接过程,云端会对设备进行身份认证。

4.2 物模型规范校验

由于目前物模型配置仅提供强校验模式,物模型规范校验主要对设备上报的报文进行Alink协议解析、物模型数据规范校验。平台后续会陆续开放弱校验、免校验、数据去重能力。

关联阅读:1.3 物模型配置

4.3 设备管理能力

4.3.1 设备标签
介绍文档:https://help.aliyun.com/document_detail/73733.html
4.3.2 设备分组
介绍文档:https://help.aliyun.com/document_detail/90386.html
4.3.3 OTA
介绍文档:https://help.aliyun.com/document_detail/85700.html
4.3.4 设备分发
介绍文档:https://help.aliyun.com/document_detail/143450.html

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
6月前
|
传感器 XML JSON
AIOT-物模型(产品模型、数据模板)
使用 TSL 描述的物联网中的实体模型,就是“物模型”也称产品模型、数据模板。
542 0
|
监控 物联网 Java
|
物联网 API 开发工具
|
存储 监控 物联网
|
安全 测试技术 持续交付
闲鱼技术2022年度白皮书-技术质量主题-这半年我做交易链路自动化回归的那些事儿(下)
闲鱼技术2022年度白皮书-技术质量主题-这半年我做交易链路自动化回归的那些事儿
229 0
|
存储 监控 物联网
oT 设备物模型接入价值与实践|学习笔记
快速学习 oT 设备物模型接入价值与实践。
290 1
oT 设备物模型接入价值与实践|学习笔记
|
存储 人工智能 监控
IoT设备物模型接入价值与实践(一)|学习笔记
快速学习IoT设备物模型接入价值与实践(一)
IoT设备物模型接入价值与实践(一)|学习笔记
|
物联网 Java 开发工具
IoT设备物模型接入价值与实践(二)|学习笔记
快速学习IoT设备物模型接入价值与实践(二)
IoT设备物模型接入价值与实践(二)|学习笔记
|
物联网 开发工具 开发者
IoT设备物模型接入价值与实践(三)|学习笔记
快速学习IoT设备物模型接入价值与实践(三)