概述
物模型指将物理空间中的实体数字化,并在云端构建该实体的数据模型。在物联网平台中,定义物模型即定义产品功能。完成功能定义后,系统将自动生成该产品的物模型。物模型描述产品是什么,能做什么,可以对外提供哪些服务。物模型将产品功能类型分为三类:属性、服务、和事件。定义了这三类功能,即完成了物模型的定义。
功能类型 | 说明 |
---|---|
属性(Property) | 一般用于描述设备运行时的状态,如环境监测设备所读取的当前环境温度等。属性支持GET和SET请求方式。应用系统可发起对属性的读取和设置请求。 |
服务(Service) | 设备可被外部调用的能力或方法,可设置输入参数和输出参数。相比于属性,服务可通过一条指令实现更复杂的业务逻辑,如执行某项特定的任务。 |
事件(Event) | 设备运行时的事件。事件一般包含需要被外部感知和处理的通知信息,可包含多个输出参数。如,某项任务完成的信息,或者设备发生故障或告警时的温度等,事件可以被订阅和推送。 |
使用: 设备端可以上报属性和事件;云端可以向设备端发送设置属性和调用服务的指令。
Step By Step
1、产品物模型model.json(替换产品productKey导入即可)
{
"schema":"https://iotx-tsl.oss-ap-southeast-1.aliyuncs.com/schema.json",
"profile":{
"productKey":"a1qLU******"
},
"properties":[
{
"identifier":"Temperature",
"name":"温度",
"accessMode":"rw",
"desc":"电机工作温度",
"required":false,
"dataType":{
"type":"float",
"specs":{
"min":"-55",
"max":"200",
"unit":"℃",
"step":"0.1"
}
}
}
],
"events":[
{
"identifier":"post",
"name":"post",
"type":"info",
"required":true,
"desc":"属性上报",
"method":"thing.event.property.post",
"outputData":[
{
"identifier":"Temperature",
"name":"温度",
"dataType":{
"type":"float",
"specs":{
"min":"-55",
"max":"200",
"unit":"℃",
"step":"0.1"
}
}
}
]
},
{
"identifier":"event1",
"name":"异常事件",
"type":"info",
"required":false,
"method":"thing.event.event1.post",
"outputData":[
{
"identifier":"event1",
"name":"事件参数1",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"100",
"step":"1"
}
}
}
]
}
],
"services":[
{
"identifier":"set",
"name":"set",
"required":true,
"callType":"async",
"desc":"属性设置",
"method":"thing.service.property.set",
"inputData":[
{
"identifier":"Temperature",
"name":"温度",
"dataType":{
"type":"float",
"specs":{
"min":"-55",
"max":"200",
"unit":"℃",
"step":"0.1"
}
}
}
],
"outputData":[
]
},
{
"identifier":"get",
"name":"get",
"required":true,
"callType":"async",
"desc":"属性获取",
"method":"thing.service.property.get",
"inputData":[
"Temperature"
],
"outputData":[
{
"identifier":"Temperature",
"name":"温度",
"dataType":{
"type":"float",
"specs":{
"min":"-55",
"max":"200",
"unit":"℃",
"step":"0.1"
}
}
}
]
},
{
"identifier":"addFuctionServiceAsync",
"name":"异步加法服务",
"required":false,
"callType":"async",
"method":"thing.service.addFuctionServiceAsync",
"inputData":[
{
"identifier":"add1",
"name":"加数1",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"100",
"step":"1"
}
}
},
{
"identifier":"add2",
"name":"加数2",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"100",
"step":"1"
}
}
}
],
"outputData":[
{
"identifier":"result",
"name":"结果",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"200",
"step":"1"
}
}
}
]
},
{
"identifier":"addFuctionServiceSync",
"name":"加法服务",
"required":false,
"callType":"sync",
"desc":"同步功能",
"method":"thing.service.addFuctionServiceSync",
"inputData":[
{
"identifier":"add2",
"name":"加数2",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"100",
"step":"1"
}
}
},
{
"identifier":"add1",
"name":"加数1",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"100",
"step":"1"
}
}
}
],
"outputData":[
{
"identifier":"result",
"name":"计算结果",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"200",
"step":"1"
}
}
}
]
}
]
}
2、设备端Code Sample(基于开源Java MQTT Client)
import com.alibaba.fastjson.JSONObject;
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
public class IoTThingTest {
public static String productKey = "a1qLU******";
public static String deviceName = "device1";
public static String deviceSecret = "cA9wzIM6bL2QI6DgAaSO0FPg********";
public static String regionId = "cn-shanghai";
// 物模型-属性上报topic
private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post";
// 物模型-属性响应topic
private static String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post_reply";
// 物模型-事件上报topic
private static String eventPubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/event1/post";
// 物模型-事件上报响应topic
private static String eventSubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/event1/post_reply";
private static MqttClient mqttClient;
public static void main(String [] args){
// 初始化Client
initAliyunIoTClient();
try {
mqttClient.subscribe(subTopic); // 订阅Topic
mqttClient.subscribe(eventSubTopic);
} catch (MqttException e) {
System.out.println("error:" + e.getMessage());
e.printStackTrace();
}
// 设置订阅监听
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connection Lost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("Sub message");
System.out.println("Topic : " + s);
System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad
// 服务调用处理
if (s.contains("Async") || s.contains("rrpc")) {
String content = new String((byte[]) mqttMessage.getPayload());
System.out.println("服务请求Topic:" + s);
System.out.println("服务指令:" + content);
JSONObject request = JSONObject.parseObject(content);
JSONObject params = request.getJSONObject("params");
if (!params.containsKey("add1")) { // 检查入参
System.out.println("不包含参数add1");
return;
}
Integer input1 = params.getInteger("add1"); // 获取入参
Integer input2 = params.getInteger("add2"); // 获取入参
JSONObject response = new JSONObject();
JSONObject data = new JSONObject();
data.put("result", input1 + input2);
response.put("id", request.get("id"));
response.put("code", 200);
response.put("data", data);
String responseTopic = s;
// 服务响应
if (s.contains("rrpc")) {
// 同步服务调用响应Topic
responseTopic = s.replace("request", "response");
} else {
// 异步服务调用响应Topic
responseTopic = responseTopic + "_reply";
}
MqttMessage message1 = new MqttMessage(response.toString().getBytes("utf-8"));
System.out.println("responseTopic: " + responseTopic);
mqttClient.publish(responseTopic, message1);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
// 属性上报
postDeviceProperties();
// 事件上报
postDeviceEvent();
}
/**
* 初始化 Client 对象
*/
private static void initAliyunIoTClient() {
try {
// 构造连接需要的参数
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setConnectionTimeout(10);
// connOpts.setCleanSession(true);
connOpts.setCleanSession(false);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
/**
* 事件上报
*/
private static void postDeviceEvent() {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("事件上报");
String payloadJson = "{\"params\":{\"event1\":23}}";
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(1);
mqttClient.publish(eventPubTopic, message);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 汇报属性
*/
private static void postDeviceProperties() {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("上报属性值");
String payloadJson = "{\"params\":{\"Temperature\":13}}";
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(1);
mqttClient.publish(pubTopic, message);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
参考链接: 基于开源JAVA MQTT Client连接阿里云IoT
3、启动设备查看属性及事件上报情况
- 3.1 服务端情况
- 3.2 设备端reply情况
上报属性值
事件上报
Sub message
Topic : /sys/a1qLU******/device1/thing/event/property/post_reply
{"code":200,"data":{},"id":"null","message":"success","method":"thing.event.property.post","version":"1.0"}
Sub message
Topic : /sys/a1qLU******/device1/thing/event/event1/post_reply
{"code":200,"data":{},"id":"null","message":"success","method":"thing.event.event1.post","version":"1.0"}
4、异步服务调用
- 4.1 说明
通过 InvokeThingService或 InvokeThingsService接口调用服务,物联网平台采用异步方式下行推送请求,设备也采用异步方式返回结果。 此时,服务选择为异步调用方式,物联网平台订阅此处的异步响应Topic。异步调用的结果,可以使用 规则引擎数据流转功能获取,也可以使用服务端订阅获取。
- 4.2 Open API InvokeThingService
- 4.3 设备端日志
Sub message
Topic : /sys/a1qLU******/device1/thing/service/addFuctionServiceAsync
{"method":"thing.service.addFuctionServiceAsync","id":"2015524670","params":{"add2":2,"add1":2},"version":"1.0.0"}
服务请求Topic:/sys/a1qLU******/device1/thing/service/addFuctionServiceAsync
服务指令:{"method":"thing.service.addFuctionServiceAsync","id":"2015524670","params":{"add2":2,"add1":2},"version":"1.0.0"}
responseTopic: /sys/a1qLU******/device1/thing/service/addFuctionServiceAsync_reply
- 4.4 控制台日志
- 4.5 AMQP 服务端订阅获取的服务响应
Content:{"iotId":"*******","code":200,"data":{"result":4},"requestId":"2009566194","topic":"/sys/*******/device1/thing/service/addFuctionServiceAsync_reply","source":"DEVICE","gmtCreate":1583658474852,"productKey":"a1qLU******","deviceName":"device1"}
5、同步服务调用
- 5.1 说明
通过 InvokeThingService或 InvokeThingsService接口调用服务,物联网平台直接使用 RRPC同步方式下行推送请求。此时,服务选择为同步调用方式,物联网平台订阅RRPC对应Topic。
- 5.2 Open API InvokeThingService
- 5.3 设备端日志
Sub message
Topic : /sys/a1qLU******/device1/rrpc/request/1236608506958775809
{"method":"thing.service.addFuctionServiceSync","id":"2015301903","params":{"add2":2,"add1":2},"version":"1.0.0"}
服务请求Topic:/sys/a1qLU******/device1/rrpc/request/1236608506958775809
服务指令:{"method":"thing.service.addFuctionServiceSync","id":"2015301903","params":{"add2":2,"add1":2},"version":"1.0.0"}
responseTopic: /sys/a1qLU******/device1/rrpc/response/1236608506958775809
- 5.4 控制台日志
更多参考
设备属性、事件、服务
基于开源Java MQTT Client的阿里云物联网平台RRPC功能测试
阿里云物联网平台规则引擎综述