物模型价值
物联网元年
关键词:探索、快速
2016年阿里云物联网平台(前称:物联网套件)上线,为客户设备上云提供了通道能力,包括MQTT连接、消息流转等核心功能。
第一批客户大多基于该模式使用物联网平台能力,当时整个行业处于物联网云平台起步期,包括AWS,Azure起步阶段同样只是提供通道能力。
基于通道能力,客户使用物联网平台接入方式详见文档 https://developer.aliyun.com/article/746536。
这个阶段的客户大多是硬件厂商,软硬一体开发,尝试物联网转型提升设备价值,对物联网平台的诉求比较简单,希望自己更多参与,对新模式有更多把控力,所以都会采用自定义协议上云。
物联网繁荣
关键词:生态、扩展、数字化
近两年物联网设备、解决方案如雨后春笋般涌出,不少用户希望赶上物联网这波浪潮。这个阶段的客户不仅仅关注设备连云,也开始关注围绕设备产生的解决方案。因此客户角色从硬件厂商,快速扩展到集成商、软件提供商等。由于大量角色的进入,对软硬开发解耦、易扩展的能力提出了诉求。同时我们也发现第一批使用通道能力的平台客户随着自己业务发展、设备扩展,原来的架构已无法支撑,对物联网平台也提出了新的要求。
举两个典型场景:
- 老客户升级:某个共享设备提供商,原来仅提供大学校园共享洗衣机服务,利用物联网平台通道能力上云,随着公司业务发展,从共享洗衣机业务扩展到校园淋浴、饮水机、充电桩等多类设备,原来自定义协议和API无法支撑多品类设备,难扩展。需要有一套接入标准和规范,方便快速扩展设备类型。
- 新生态客户:某个充电桩平台客户,提供充电桩管理平台,作为甲方要求大量桩企(乙方)按照平台规范接入,典型的软硬件分离场景。需要有一套接入标准和规范,方便快速扩展桩企规模。
这一阶段平台在通道能力之上,提供了物模型能力,物模型可以屏蔽底层设备差异,让软件开发者基于平台提供的标准API开发;硬件开发者基于平台提供的标准协议开发;从而达到软硬开发解耦的目的。
物联网赋能
关键词:场景化、智能
物联网终极目标一定是基于设备采集数据赋能业务,实现数字业务化。例如金融、物流、家居、餐饮、商场、医疗、交通等不同领域通过物联网数字化后,结合数据分析智能化决策、互联互通、场景规则、数字孪生等能力实现纵深领域场景化、智能化。
这一阶段平台在通道能力、物模型能力之上,还进一步提供设备智能运维、数据分析、可视化、数字孪生等高价值服务,帮助客户数字化后产生真正的业务价值。
基于以上分析,物联网已经过了最初的“元年”阶段,也迈入了“繁荣”阶段,正逐步朝“问物联网要赋能”的阶段演进。物模型是物联网生态化、高扩展、数字化、智能化非常重要的基础,强烈建议客户使用。
物模型接入实践
自定义接入模式
以一个老客户为例,原来仅使用物联网平台通道能力,下图中1~8流程都需要自定义开发,当客户设备类型足够简单时,该模式复杂度通常不会成为客户痛点。
面临的挑战
随着客户接入设备种类越来越多,面临的扩展性问题也越来越严峻。
使用物模型后的模式
物模型模式下,设备与云交互协议、云平台设备API都基于物模型标准化了,即使设备不断扩展,客户业务服务器和设备端逻辑都不需要进行调整,保证了扩展性。
物模型接入流程详细介绍
流程图
以下是客户详细接入流程,主要分为:云端配置、设备开发、服务端开发、设备运行时管理四大部分。平台会提供一些工具,使各部分流程更高效。接下来进行详细介绍。
本文试图手把手介绍从0到1接入物模型,还会配套介绍一些接入过程中有帮助的平台能力,所以文章篇幅比较长,事实上客户接入流程还是非常简单的,真正开发只需要涉及到图中红色三个模块。
如果您希望快速接入,可以直接关注P0部分,其它部分都可以跳过。
1 云端配置
1.1 创建产品(P0)
1.登录物联网平台。
2.创建产品。
说明:
• 所属品类:标准品类库提供了一些供参考的模板,选择后可以修改,建议使用。
• 节点类型:根据实际选择即可。
• 数据格式:“ICA标准数据格式(Alink JSON)”表示设备使用标准Alink JSON格式上报数据;“透传/自定义”表示设备可以使用自定义格式,通过Alink特定Topic上报物联网平台,该模式客户需要写脚本进行转换,透传模式在此不做展开,后面单独起文章介绍。
1.2 物模型建模(P0)
1.模型查看。
已有的模型是继承自创建产品时选择的“充电桩”品类模板。
2.编辑模型。
通过“编辑草稿”,进行修改和添加,最后需要对物模型“发布上线”。
说明:
• 定义物模型非常重要,物模型通过属性、事件、服务三要素描述了设备所有能力,设备和云交互、客户服务器访问设备通过物模型都可以实现协议标准化。如果客户定义的物模型如果足够通用和专业,阿里可以帮助作为ICA行业标准进行推广。
• 服务的调用方式有:同步调用、异步调用两种模式。客户云端开发调用下行控制API,同步调用和异步调用获取返回结果方式不一样,在后文“3.3”章节详细介绍。
物模型概念介绍
物模型介绍文档请参见这里。
了解物模型概念,能够帮助您更好对设备建模。
1.3 物模型配置
当前默认是物模型强校验模式,即设备上报数据在IoT平台会进行物模型数据规范强校验,如果不符合规范会报错。
另外物模型弱校验、免校验、去重等规则也会在近期陆续开放,后期进行文档补充。
配置之后,会在设备运行时生效。
关联阅读:4.2 物模型扩展规则校验。
1.4 注册三元组(P0)
1.注册设备。
说明:
• 添加设备:测试阶段使用较多,单个添加。
• 批量添加:量产阶段使用,有两种模式,“自动生成”表示设备标识符(deviceName)由平台按照一定的规则随机颁发;“批量上传”支持客户自定义设备标识符(deviceName)。
2.查看设备列表。
可以通过“设备列表”、“批次管理”两种方式查看创建的设备列表。
通过“批次管理”查看这一批次设备详情,并且支持下载三元组列表。
注意:此处设备标识符(deviceName)非常重要,与productKey, deviceSecret一起称为设备的“三元组”,作为设备的唯一身份,大部分情况需要烧录到设备上。
2 设备开发
2.1 使用设备SDK开发(P0)
设备接入SDK文档请参见这里。
根据需要选择合适的语言版本。C SDK 建议使用“4.x”版本。
本文选择 Java SDK进行演示。
环境准备:https://help.aliyun.com/document_detail/97331.html
物模型开发:https://help.aliyun.com/document_detail/97333.html
1.开发之前需要先准备如下好两份数据:
- 设备证书信息(productKey、deviceName、deviceSecret)
- 设备物模型
为了方便查看物模型详细数据规范,通过导出“物模型TSL”查看详细物模型定义,其中包括物模型属性、事件、服务标识符、参数、数据规范。抽取部分内容,针对以下属性、事件、服务在DEMO中进行开发演示。
"schema":"https://iotx-tsl.oss-ap-southeast-1.aliyuncs.com/schema.json",
"profile":{
"productKey":"a1nhbEV****"
},
"properties":[
{
"identifier":"acOutMeterIty",
"name":"交流输出电表底值监测属性",
"accessMode":"rw",
"required":false,
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
],
"events":[
{
"identifier":"post",
"name":"post",
"type":"info",
"required":true,
"desc":"属性上报",
"method":"thing.event.property.post",
"outputData":[
{
"identifier":"acOutMeterIty",
"name":"交流输出电表底值监测属性",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
]
},
{
"identifier":"startChaResEvt",
"name":"启动充电结果事件",
"type":"info",
"required":false,
"method":"thing.event.startChaResEvt.post",
"outputData":[
{
"identifier":"gunNum",
"name":"充电枪编号",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"100",
"step":"2"
}
}
}
]
}
],
"services":[
{
"identifier":"set",
"name":"set",
"required":true,
"callType":"async",
"desc":"属性设置",
"method":"thing.service.property.set",
"inputData":[
{
"identifier":"acOutMeterIty",
"name":"交流输出电表底值监测属性",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
],
"outputData":[
]
},
{
"identifier":"get",
"name":"get",
"required":true,
"callType":"async",
"desc":"属性获取",
"method":"thing.service.property.get",
"inputData":[
"acOutMeterIty"
],
"outputData":[
{
"identifier":"acOutMeterIty",
"name":"交流输出电表底值监测属性",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
]
},
{
"identifier":"startChaResService",
"name":"开启充电",
"required":false,
"callType":"async",
"method":"thing.service.startChaResService",
"inputData":[
{
"identifier":"charm",
"name":"电量",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"100",
"step":"2"
}
}
}
],
"outputData":[
{
"identifier":"realcharm",
"name":"realcharm",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"100",
"step":"2"
}
}
}
]
}
]
}
2.开发代码。
如下示例中只需要将三元组,和属性、事件、服务参数替换成您的设备信息。其它代码可以直接运行。
关于免订阅能力介绍:
有些设备最资源比较敏感,为了避免初始化订阅大量Alink协议中系统Topic带来的性能开销,平台提供了免订阅能力,即平台帮设备进行Topic订阅。
SDK只有3.1.0及以后版本支持免订阅能力,并且默认打开该能力。
如果3.1.0及以后版本SDK您希望取消免订阅,依旧按需订阅Topic,可以设置SDK配置项关闭该能力,在make.settings中设置“FEATURE_MQTT_AUTO_SUBSCRIBE=n”。
public class Demo {
public static void main(String[] args) throws Exception {
String pk = "a1nhbEVCP**";
String dn = "7mBP6Dd6IT27Rt***";
String ds = "*****";
/**
* 连接 & 认证
*/
LinkKitInitParams params = new LinkKitInitParams();
// 设置 Mqtt 初始化参数
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = pk;
config.deviceName = dn;
config.deviceSecret = ds;
config.receiveOfflineMsg = false;
params.mqttClientConfig = config;
// 设置初始化三元组信息,用户传入
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = pk;
deviceInfo.deviceName = dn;
deviceInfo.deviceSecret = ds;
params.deviceInfo = deviceInfo;
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
public void onError(AError aError) {
System.out.println("===============FAILURE===============");
ALog.e(TAG, "Init Error error=" + aError);
System.out.println("===============FAILURE===============");
}
public void onInitDone(InitResult initResult) {
System.out.println("===============SUCCESS===============");
ALog.i(TAG, "onInitDone result=" + initResult);
System.out.println("===============SUCCESS===============");
}
});
//此处sleep 5S,由于上面init是异步流程
Thread.sleep(5000);
/**
* 物模型开发
*/
/**
* 上报属性
*/
Map<String, ValueWrapper> properties = new HashMap<>();
// key为物模型中属性标识符"acOutMeterIty",value需要遵循属性值规范:int类型,取值范围在0~200之间;
properties.put("acOutMeterIty", new ValueWrapper(10));
LinkKit.getInstance().getDeviceThing().thingPropertyPost(properties, new IPublishResourceListener() {
@Override
public void onSuccess(String s, Object o) {
System.out.println("=====thingPropertyPost success=======");
System.out.println(s);
System.out.println(JSON.toJSONString(o));
}
@Override
public void onError(String s, AError aError) {
System.out.println("=====thingPropertyPost failure=======");
}
});
// 上报属性之后,云端会返回响应结果,此处是监听云端返回的属性reply
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public void onNotify(String s, String s1, AMessage aMessage) {
System.out.println("===PROPERTY REPLY===");
System.out.println("TOPIC:" + s1);
System.out.println("Payload:" + JSON.toJSONString(aMessage));
}
@Override
public boolean shouldHandle(String s, String s1) {
return false;
}
@Override
public void onConnectStateChange(String s, ConnectState connectState) {
}
});
/**
* 上报事件
*/
HashMap<String, ValueWrapper> eventMap = new HashMap<>();
// key为物模型中事件参数的标识符"gunNum", value为事件参数值需要遵循数值规范:int类型,取值范围0~100之间;
eventMap.put("gunNum", new ValueWrapper.IntValueWrapper(50));
OutputParams eventOutput = new OutputParams(eventMap);
// 参数identity为"startChaResEvt"属于物模型事件标识符。
LinkKit.getInstance().getDeviceThing().thingEventPost("startChaResEvt", eventOutput, new IPublishResourceListener() {
public void onSuccess(String resId, Object o) {
System.out.println("=====thingEventPost success=======");
System.out.println(resId);
System.out.println(JSON.toJSONString(o));
}
public void onError(String resId, AError aError) {
System.out.println("=====thingEventPost failure=======");
}
});
/**
* 监听并执行下行服务
*/
// 获取设备支持的所有服务
LinkKit.getInstance().getDeviceThing().getServices();
// 用户可以根据实际情况注册自己需要的服务的监听器
List<Service> srviceList = LinkKit.getInstance().getDeviceThing().getServices();
for (int i = 0; srviceList != null && i < srviceList.size(); i++) {
Service service = srviceList.get(i);
LinkKit.getInstance().getDeviceThing().setServiceHandler(service.getIdentifier(), new ITResRequestHandler() {
public void onProcess(String identify, Object result, ITResResponseCallback itResResponseCallback) {
System.out.println("onProcess() called with: s = [" + identify + "], o = [" + result + "], itResResponseCallback = [" + itResResponseCallback + "]");
System.out.println("收到云端异步服务调用 " + identify);
try {
/**
* 设置属性(property)的模式
*/
// "set"为设置属性默认的标识符
if ("set".equals(identify)) {
// TODO 用户需要设置真实设备的的属性
/**
* 向云端同步设置好的属性值
*/
Map<String, ValueWrapper> desiredProperty = (Map<String, ValueWrapper>) ((InputParams) result).getData();
LinkKit.getInstance().getDeviceThing().thingPropertyPost(desiredProperty, new IPublishResourceListener() {
@Override
public void onSuccess(String s, Object o) {
if (result instanceof InputParams) {
Map<String, ValueWrapper> data = (Map<String, ValueWrapper>) ((InputParams) result).getData();
// data.get()
ALog.d(TAG, "收到异步下行数据 " + data);
// 响应云端 接收数据成功
itResResponseCallback.onComplete(identify, null, null);
} else {
itResResponseCallback.onComplete(identify, null, null);
}
}
@Override
public void onError(String s, AError aError) {
AError error = new AError();
error.setCode(100);
error.setMsg("setPropertyFailed.");
itResResponseCallback.onComplete(identify, new ErrorInfo(error), null);
}
});
/**
* 服务(service)的模式
*/
// "startChaResService"为服务的标识符
} else if ("startChaResService".equals(identify)) {
Map<String, ValueWrapper> inputParams = (Map<String, ValueWrapper>) ((InputParams) result).getData();
// TODO 根据服务入参inputParams执行设备逻辑,比如启动充电
// 充电完成后,向云端返回输出参数
OutputParams outputParams = new OutputParams();
// key为"charm"属于物模型中"startChaResService"服务出参标识符,value为出参值遵循数据规范:int类型,数据范围1~100之间;
outputParams.put("charm", new ValueWrapper.IntValueWrapper(20));
itResResponseCallback.onComplete(identify, null, outputParams);
} else {
// 根据不同的服务做不同的处理,跟具体的服务有关系
OutputParams outputParams = new OutputParams();
// 根据特定服务,按照服务规范返回服务的出参。
itResResponseCallback.onComplete(identify, null, outputParams);
}
} catch (Exception e) {
e.printStackTrace();
ALog.d(TAG, "云端返回数据格式异常");
}
}
public void onSuccess(Object o, OutputParams outputParams) {
ALog.d(TAG, "onSuccess() called with: o = [" + o + "], outputParams = [" + outputParams + "]");
ALog.d(TAG, "注册服务成功");
}
public void onFail(Object o, ErrorInfo errorInfo) {
ALog.d(TAG, "onFail() called with: o = [" + o + "], errorInfo = [" + errorInfo + "]");
ALog.d(TAG, "注册服务失败");
}
});
}
}
}
说明:
• 上报属性成功,云端会返回REPLY,有以下日志说明设备到云,云到设备的链路全部走通。
• 设备收到属性设置指令,在完成物理设备属性修改后,建议将最新属性同步上报云端。
2.2 不使用SDK开发
1.协议准备。
“2.1 使用设备SDK开发”介绍了使用阿里云提供的SDK进行设备开发,当然您也可以选择不使用SDK,完全基于Alink协议(设备和云交互协议)开发。
Alink协议文档:https://help.aliyun.com/document_detail/90459.html
重点关注物模型协议部分:https://help.aliyun.com/document_detail/89301.html 。里面包含了物模型相关所有Topic介绍(物模型Topic列表在控制台也可以查看,如下图)。
文档详细介绍了设备端如何向云端上报“属性”、“事件”,如何订阅云端向下发送的“服务”指令。
Topic和Payload都基于客户定义的物模型进行标准化和规范化,从而使得客户设备与云交互方式不会随着设备类型变化而改变,满足扩展性要求。
2.环境准备。
根据自己选型选择合适的MQTT客户端,本文选择eclipse paho。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>//可以选择您需要的版本
</dependency>
3.开发。
物模型复用“2.1 使用设备SDK开发”中“开发前准备”给出的。
关于免订阅能力介绍:
有些设备最资源比较敏感,为了避免初始化订阅大量Alink协议中系统Topic带来的性能开销,平台提供了免订阅能力,即平台帮设备进行Topic订阅。
SDK只有3.1.0及以后版本支持免订阅能力,并且默认打开该能力。
如果不使用SDK开发,可以通过设备端在MQTT的连接报文中的clientId部分, 新增_ss=1表示开启自动订阅, 建连成功后服务端会自动订阅上以下表格中的topic, 若传递 _ss=0 或者不传递该字段, 则不会发生服务端自动订阅动作。
4.上报属性。
String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT2*****";
String deviceSecret = "****";
// MQTT连接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);
client.connect();
String setTopic = "/thing/event/property/post";
String setTopicReply = "/thing/event/property/post_reply";
// 上报属性,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅)
// 此处client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client subscribe
client.sysTopic(setTopicReply).subscribe();
// 封装Alink协议系统参数
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保证设备端一段时间内唯一
payload.put("params", params);
payload.put("method", "thing.event.property.post");
// 组装属性payload
String propKey = "acOutMeterIty";
int statusValue = 30;
Map<String, Object> proValue = new HashMap<>();
proValue.put("value", statusValue);
proValue.put("time", System.currentTimeMillis());
params.put(propKey, proValue);
// 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));
// 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息)
client.sysTopic(setTopicReply).readTopic(10000);
client.disconnect();
日志打印的设备请求和响应。
5.上报事件。
String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT27*****";
String deviceSecret = "***";
// MQTT连接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);
client.connect();
// topic中为"startChaResEvt"属于物模型事件标识符。
String setTopic = "/thing/event/startChaResEvt/post";
String setTopicReply = "/thing/event/startChaResEvt/post_reply";
// 报事件,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅)
client.sysTopic(setTopicReply).subscribe();
// 封装Alink协议系统参数
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保证设备端一段时间内唯一
payload.put("params", params);
payload.put("method", "thing.event.startChaResEvt.post");
// 组装属性payload
Map<String, Object> dataValue = new HashMap<>();
// key为物模型中事件参数的标识符"gunNum", value为事件参数值需要遵循数值规范:int类型,取值范围0~100之间;
dataValue.put("gunNum", 59);
params.put("value", dataValue);
params.put("time", System.currentTimeMillis());
// 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));
// 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息)
client.sysTopic(setTopicReply).readTopic(10000);
client.disconnect();
6.服务调用。
此处为一段伪代码。可以在MQTT建连的时候通过callback监听云端下发的控制指令或消息。
前提:已经对下行的TOPIC进行订阅过,免订阅能力参考上面介绍。
mqttClient = new MqttClient(url, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(true);
connOpts.setCleanSession(false);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(65);
LogUtil.log(clientId + "进行连接, 目的地: " + url);
// 此处订阅云端下发的消息
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
LogUtil.log("connection lost, cause:" + cause);
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
TopicChannel topicChannel = getTopic(topic);
LogUtil.log("receive message, channel:" + topicChannel
+ ",topic:" + topic
+ ", payload:" + new String(message.getPayload(), "UTF-8") + "");
topicChannel.put(message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//如果是qos 0消息 token.resp是没有回复的
LogUtil.log("sent, " + ((token == null || token.getResponse() == null) ? "null"
: token.getResponse().getKey()));
}
});
mqttClient.connect(connOpts);
重点说明:
• 所有被订阅的下行Topic都会被监听到。物模型相关的主要包括:属性上报Reply、属性下行设置、服务下行控制。
• 设置设备属性(https://help.aliyun.com/document_detail/89301.html#title-wmh-y2e-18r),默认异步方式返回结果。
• 订阅的Topic为Alink协议标准Topic:“/sys/{productKey}/{deviceName}/thing/service/property/set”
• 服务控制(https://help.aliyun.com/document_detail/89301.html#title-3pt-nfy-jys),同异步方式取决于物模型中service配置的调用模式。
• 服务异步方式订阅的Topic为Alink协议标准Topic:“/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}”
• 服务同步方式订阅的Topic需要遵循RRPC Topic模式:详见文档https://help.aliyun.com/document_detail/90568.html
注意:仅设备侧需要感知RRPC特殊TOPIC,设备上云后,数据流转、开放API面向的还是Alink协议编程。
2.3 在线调试
设备开发后之后,如何快速模拟业务服务器给设备下发指令,调试设备能力?IoT平台提供了“在线调试”的功能,可以模拟设备或模拟应用端到端调试。
此处使用“在线调试”里面“调试真实设备”能力。通过控制台下发设备控制指令,分两类:1)属性设置;2)服务调用。
1.服务调用调试。
云端下发后,可以到设备端查看控制Log是否打印,以判断指令达到端侧。
从图中可见设备收到startChaResService服务,同时向云端返回了输出参数。
2.属性设置调试。
说明:
• “获取”:暂不支持到设备,只能从云端获取设备最新属性。
• “设置”:指令直接到设备端,设备修改本地属性之后,上报云端最新属性;到设备上的设置指令为"set"。
• “设置期望值”:如果设备在线,会直接下发设备,如果设备离线,指令在云端进行缓存,待上线后下发设备端,下发之后,设备修改本地属性之后,同样上报云端最新属性;到设备上的设置指令同样为"set"。如果您希望使用物模型期望值能力,可点击查看最佳实践。
云端下发后,可以到设备端查看控制Log是否打印,以判断指令达到端侧。
从图中可见设备收到set指令,返回了服务响应,同时向云端上报了最新属性。
说明:服务结果还可以通过“2.4 查看物模型数据”章节中获取。
2.4 查看物模型数据
DEMO运行之后,可以看到设备已经“在线”状态。
“运行状态”展示设备上报的属性值;
“事件管理”展示设备上报的事件;
“服务调用”展示云端下发设备的控制服务;
上报属性结构化展示。
上报事件,包括事件参数展示。
属性设置、服务调用两类服务的云端下发入参、设备响应出参都有展示,如上证明设备收到云端指令,并且正常返回响应。
2.5 查看日志服务
设备在运行过程,可能会出现一些异常,比如连接失败、认证失败、数据异常等等,为了便于排查,可以查看日志服务。举例设备上报数据可能会不符合物模型规范,比如事件参数"gunNum"对应值的数据范围为0~100之间,而真实上报了50000。日志服务会展示设备错误详情。
可以看到日志内容为“{"Reason":"tsl parse: int value is bigger than max 100 -> gunNum"}”,说明gunNum对应值超过物模型规范最大值100的限制。物模型规范详情到“物模型TSL”查看。
同时可以通过“日志转储”中“日志报表”进一步查看设备大盘,包括设备上下线次数、设备上线IP区域分布、设备消息量、设备消息量Top列表、物模型错误分布、云端API错误分布等多维度指标。
日志服务介绍文档请参见这里。
3 服务端开发
设备连接到阿里云IoT平台,设备数据会保存在IoT平台时序数据库。同时IoT平台提供两种方式供客户获取设备数据:方式1)通过服务端订阅或者规则引擎实时流转到客户服务器;2)通过开放API供客户调用获取。
3.1 服务端调用API开发(P0)
1.环境准备。
SDK下载文档:https://help.aliyun.com/document_detail/30581.html
API接口列表:https://help.aliyun.com/document_detail/69579.html
重点关注物模型使用相关API
2.以下示例为设置设备属性API,设备异步返回结果,客户需要通过“数据流转”方式获取。
String accessKey = "***";
String accessSecret = "***";
try {
DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
} catch (Exception e) {
System.out.println("DefaultProfile exception");
}
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKey, accessSecret);
DefaultAcsClient defaultAcsClient = new DefaultAcsClient(profile);
SetDevicePropertyRequest setDevicePropertyRequest = new SetDevicePropertyRequest();
// 如果使用实例,此处传入真实实例id;如果公共实例,不需要设置。
//createProductRequest.setIotInstanceId("iothub-test-xxx");
setDevicePropertyRequest.setProductKey(pk);
setDevicePropertyRequest.setDeviceName(dn);
Map<String, Integer> properties = new HashMap<>();
// key为物模型中属性标识符"acOutMeterIty",value需要遵循属性值规范:int类型,取值范围在0~200之间;
properties.put("acOutMeterIty", 98);
setDevicePropertyRequest.setItems(JSON.toJSONString(properties));
SetDevicePropertyResponse response = null;
try {
response = defaultAcsClient.getAcsResponse(setDevicePropertyRequest);
} catch (Exception e) {
Log.error("执行失败:e:" + e.getMessage());
}
System.out.println("===============");
System.out.println("setDeviceProperty request : " + JSON.toJSONString(setDevicePropertyRequest));
System.out.println("setDeviceProperty response : " + JSON.toJSONString(response.getData()));
System.out.println("setDeviceProperty requestId : " + response.getRequestId());
System.out.println("===============");
重点说明:
下行控制如果为异步服务,需要通过订阅数据流转获取设备返回结果,订阅方式和数据结构详见“3.2 数据流转”章节介绍。
关联介绍:“3.2.1 服务端订阅”中“重点说明”。
3.2 数据流转
平台提供两种数据流转方式:方式1)服务端订阅;方式2)规则引擎;
3.2.1服务端订阅(P0)
服务端订阅配置
“推送消息类型”选择“设备上报消息”,包括物模型属性上报、事件上报、设备下行指令结果(包括属性设置响应、服务控制响应)等消息。
消息格式详见文档:https://help.aliyun.com/document_detail/73736.html
服务端订阅DEMO
接入说明:https://help.aliyun.com/document_detail/143601.html
/**
* AMQP服务端订阅
*/
//参数说明,请参见AMQP客户端接入说明文档。
String accessKey = "***";
String accessSecret = "***";
String consumerGroupId = "***";
//iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
String iotInstanceId = "";
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
String clientId = "TESTClientID";
//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//接入域名,请参见AMQP客户端接入说明文档。
String connectionUrl = "amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671?amqp.idleTimeout=80000";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(() -> processMessage(message));
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};
/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
System.out.println("AMQP receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
日志打印出订阅到的流转消息如下,符合预期。
重点说明:
下行控制如果为异步服务,需要通过订阅数据流转获取设备返回结果。订阅Topic为"/sys/{productKey}/{deviceName}/thing/downlink/reply/message",需要根据"requestId"关联请求和响应。
关联介绍:“3.1 服务端调用API开发”中“重点说明”。
3.2.2 规则引擎数据订阅。
配置SQL
SQL介绍文档这里。
调试SQL
Payload数据格式文档这里。
可以查看“调试结果”。
符合配置的SQL结果。
转发数据
可以转发到客户以下多种云产品中,本文选择AMQP作为示例验证。
创建完成后,需要到规则列表页“启动”改规则。
订阅数据
服务端订阅代码可以复用上面“3.1”服务端订阅代码。差别就是服务端订阅,订阅的是Topic对应的完整Payload;而规则引擎流转AMQP,在消息流转过程可以对Payload做一些规则过滤或简单计算。
以下日志精简报文是通过规则引擎过滤后获取的数据。
说明:同一组数据不要同时开通规则引擎和服务端订阅两种订阅模式,避免消息干扰。
4 设备运行时
设备量产之后,到达消费者手上,会开始激活上线进入到设备运行时。由于不属于开发态流程,本章节仅做简单介绍,目的是能让开发者知道开发态的配置在运行态如何产生作用,对设备接上阿里云IoT平台后的流程有个简单的认识。
本文通过物模型接入流程,介绍了平台设备连接、物模型规范校验、物模型数据、规则引擎、服务端订阅、开放API六大基础能力。
设备全生命周期过程中,还有不少设备管理能力供客户选择,其中包括设备标签、设备分组、设备检索、OTA、设备运维、设备分发、文件上传、远程配置等,欢迎使用。
4.1 连接
设备连接过程,云端会对设备进行身份认证。
4.2 物模型规范校验
由于目前物模型配置仅提供强校验模式,物模型规范校验主要对设备上报的报文进行Alink协议解析、物模型数据规范校验。平台后续会陆续开放弱校验、免校验、数据去重能力。
关联阅读:1.3 物模型配置
4.3 设备管理能力
4.3.1 设备标签
介绍文档:https://help.aliyun.com/document_detail/73733.html
4.3.2 设备分组
介绍文档:https://help.aliyun.com/document_detail/90386.html
4.3.3 OTA
介绍文档:https://help.aliyun.com/document_detail/85700.html
4.3.4 设备分发
介绍文档:https://help.aliyun.com/document_detail/143450.html