开发者学堂课程【阿里云 AIoT 物联网开发实战:AloT 企业物联网业务平台实战03】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/328/detail/3696
AloT 企业物联网业务平台实战03(二)
二、异步下行和同步下行
1、服务端->设备异步调用 Pub 的问题
下行的过程本身是异步的过程,如何来匹配设备响应的结果,整个过程比如 APP 发送一条指令到服务器,服务器要做的是把指令先存到比如 redies 或者 APP 里,存下来之后会给物联网平台发一个指令,是信息再发给物联网平台,通过 Pub 接口,物联网平台就会把消息推送到设备上,设备收到的消息执行对应的动作之后,会通过 MQTT 发送一条响应,响应可以通过配置的规则引擎把响应推送给业务服务器,业务服务器需要进入自己的 redies 来查询匹配当前的响应和哪一个请求是相关的,如果匹配上之后会把结果存取过来,APP 端会查询服务器,业务服务器的开机指令处理的结果是怎么样的,如果业务服务器拿到,物联网平台流转过来的结果也匹配上,就可以直接告诉 pad 端的用户,整个过程是相对复杂的,因为本质上是一个异步的过程。
2、服务端->设备实时响应,同步调用 RRPC 实战
物联网平台提供同步调用的模式,是反向的 RRPC 通过同步调用技术,可以简化企业业务系统的开发难度,采用 RRPC 的调用方式,APP 同样发送开机指令到业务系统,业务系统不是调用这个接口而是调用 RRPC 的接口,需要传递一个等待的超时时间。指令调用到物联网平台之后,物联网平台并不会立刻给业务系统返回结果,而是把消息先推送到设备端,等待设备端的回复,一旦设备端回复之后,物联网平台会自动的匹配和前面的请求关系,最终把洗衣机开机还是关机状态返回到业务系统,业务系统通过一条 HTTP 的调用,直接在 response 里拿到设备的响应结果,所以也是可以实时的通过 API 的方式给到消费端,消费者是也是实时的感知到设备是开机还是关机。
3、同步调用 RRPC 技术原理
业务器用服务器发起 RRPC 的 API 调用,允许客户来传递超时时间为一到八秒。物联网平台收到请求之后并不会直接返回结果,而是给设备发送 MQTT 的报文,topic 有一定的规则,Topic 上会携带一个 ID,设备收到 topic 消息之后会做业务处理,处理完之后会重新相应一个消息,消息也有一定的要求,一个是前面是 request 加 ID,响应的是 response 加 ID,两个 ID 要保持一致,物联网平台会匹配两个 ID,匹配上之后会给业务服务器返回 HTTP 的 response,有可能是超时,有可能是设备返回业务的结果。
请求:/sys/${productKey}/${deviceName/rrpc/request/${mess ageld}
响应:/sys/${productKey}/${deviceName}/rrpc/**response**/* *${messageld}**
$表示变量,每个设备不同
Messageld 为 loT 平台生成的消息 ID
设备端回复 responseTopic 里的 messageld 要与 requestTopic 一致
示例:
设备端需要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设备收到 Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到消息后,在 timeout 时间内回复 Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534
Topic 以及整个的交互过程,交互的核心是买家 ID 的一致性,通过ID 来匹配,一般设备端会订阅一个 topic this productkey device name2PC加+,在真实的设备运行过程中会替换成一个具体的message ID,设备回复的 topic 需要携带 message ID。
4、同步调用 RRPC 调用示例
设备端建立连接之后,要订阅 topic,第二步是一旦有消息过来之后需要去处理,处理之后要 publish 一个对应的响应,只需要把 request 关键词换成 response 就可以,对于服务端的调用和 Pub 没有太大的差别,只是在参数里多加一个超时时间,调用的 API 换成 RRPC。
演示:
设备端的模拟代码,设备端会订阅 RRPC 的 topic,然后针对 RRPC来做消息的处理,处理完了之后会及时的有 response 的响应。
const mqtt=require('aliyun-iot-mqtt');
//设备属性
const options ={
productKey:"a1HDWBEeHRa" deviceName:"7rmimujquyh",
deviceSecret:"d19e5469ae45cfaadad48f5443bc6356" regionId:"cn-shanghai"
};
//建立连接
const client =mgtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+i)
client.on('message', function(topic, message) {
if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
//接收并处理业务系统 RRPC 指令
handleRrpc(topic, message)
}
})
function handleRrpc(topic, message){
topic=topic.replace('/request/','/response/');
console.log("topic=" topic)
console.log("payload="+message)
//响应 RRPC 指令 payload 自定义
const payloadJson={bizCode:0};
// 成功,400 充电失败
client.publish(topic,JSON.stringify(payloadJson));
}
设备端的调用过程,超时是八秒,调用的时候是通过 RRPC 的 API。
});
// 指令内容
const payload ={
power:200,
port:3
};
//2.构建 RRPC 请求
const params ={
ProductKey:"a1HDWBEeHRa",
DeviceName:"7rmjmujquyh"
RequestBase64Byte:newBuffer(JSON.stringify(payload)).toString("base64"),
Timeout:8000
};
co(function*(){
//3.发起 API 调用
try{
const response = yield client.request('Rrpc', params);
console.log(JSON.stringify(response));
console.log(response.RrpcCode);
if(response.RrpcCode=="SUCCESS"){
var resultJSON=new Buffer(response.PayloadBase64Byte,"ba se64').toString();
console.log("RRPC SUCCESS =-==>",JSON.stringify(JSON.pars e(resultJSON)));
}
} catch (err) {
console.log("RRPC ERROR =---=>",JSON.stringify(err.data));
}
});
运行效果:
启动一下设备和云平台建立一个连接。
~fiot -node ChargingStation
wangxm:iot wxm$ node ChargingStation
通过云端的 API 发送一个指令。
~/iot- -bash
wangxm:iot wxm$ node StartChargingCmd
["RequestId":"B3F2522C-892E-4F85-BD44-3119AA8102F1""PayloadBase64Byte":"eyJiaXpDb2R1IjowfQ==","RrpcCode":"SUCCESS","Success":true,"MessageId":"1308632606480572928"}
SUCCESS
RRPC SUCCESS -----> {"bizCode":0}
wangxm:iot wxm$
指令已经发送出去,云端给出了设备端的响应,结果是业务 Code 是零说明成功。
设备端收到 topic 以及需要的报文。
~fiot-node ChargingStation
wangxm:iot wxm$ node ChargingStation
topic=/sys/a1HDWBEeHRa/7rmjmujquyh/rrpc/response/1308632606480572928
payload={"power":200."port":3}
比如三号端口要充200的电。这是整个的同步过程,发起调用就实时的得到一个结果。如果是异步调用只是拿到一个 request ID 和 message ID,没有业务报文。业务报文需要通过详细队列里的新的上报来匹配。这是反向的 RRPC 同步调用的一个价值。