设备在线/离线状态缓存
很多场景中,我们都需要查询设备是否在线,但POP API的访问频次受限,需要我们自己系统缓存设备状态
技术方案
- 配置规则引擎监听设备状态变更消息,流转到函数计算FC
- 由于消息乱序,在函数计算对比表格存储/Redis中设备状态和当前状态的lastTime,存储最新的数据
- 业务系统从表格存储/Redis中快速查询设备当前在线/离线状态
1.设备在线/离线状态变更消息
当设备连接到IoT物联网平台,设备离线,在线状态变更会生成特定topic的消息,我们服务端可以通过订阅这个topic获得设备状态变更信息。
**
设备的在线/离线状态流转的Topic格式:
/as/mqtt/status/{productKey}/{deviceName}
payload数据格式:
{ "status":"online|offline", "productKey":"pk13543", "deviceName":"dn1234", "time":"2018-08-31 15:32:28.205", "utcTime":"2018-08-31T07:32:28.205Z", "lastTime":"2018-08-31 15:32:28.195", "utcLastTime":"2018-08-31T07:32:28.195Z", "clientIp":"123.123.123.123" }
参数说明:
参数 | 类型 | 说明 |
status | String | 设备状态,online上线,offline离线 |
productKey | String | 设备所属产品的唯一标识 |
deviceName | String | 设备名称 |
time | String | 此条消息发送的时间点, |
lastTime | String | 状态最后变更时间 注:需要根据此时间排序,判断设备最终状态 |
clientIp | String | 设备公网出口IP |
2.通过规则引擎流转设备状态
2.1 配置数据源
创建数据源
添加指定产品下全量设备的状态变化通知
配置完成
2.2 配置数据目的地:函数计算FC
创建目的地-数据流转函数计算操作界面
2.3 创建规则引擎,配置解析器脚本
创建云产品流转解析器
关联数据源
2.4 函数计算FC实现
nodejs实现代码参考:
由于消息乱序,需要对比表格存储/Redis中已存储的设备状态和当前状态的lastTime,找出最新状态存储到缓存中。
var TableStore = require('tablestore'); var options = { accessKeyId: '你的accessKeyId', accessKeySecret: '你的accessKeySecret', } var otsClient = new TableStore.Client({ accessKeyId: options.accessKeyId, secretAccessKey: options.accessKeySecret, endpoint: '你的endpoint', instancename: '你的instancename', maxRetries: 20 //默认20次重试,可以省略这个参数。 }); var response = { isBase64Encoded: false, statusCode: 200 }; module.exports.handler = function(event, context, callback) { var eventJson = JSON.parse(event.toString()); var deviceId = eventJson.deviceName; var productKey = eventJson.productKey; var lastTime = new Date(eventJson.lastTime); var params = { tableName: "device_status_table", primaryKey: [{ 'deviceId': deviceId },{'productKey': productKey}], maxVersions: 1 }; try { otsClient.getRow(params, function(err, data) { if (err) { response.body = { msg: 'error', code: 404 }; callback(null, response); return; } //有数据,拿出来比较lastTime if (data.row.primaryKey) { var attributes = data.row.attributes; var dbTime = ''; attributes.forEach(function(item) { if (item.columnName == 'lastTime') { dbTime = new Date(item.columnValue); } }) //转换成毫秒进行比较 if (lastTime.getTime() < dbTime.getTime()) { return; } } var iot_data = { tableName: "device_status_table", condition: new TableStore.Condition(TableStore.RowExistenceExpectation.IGNORE, null), primaryKey: [{ "deviceId": deviceId },{'productKey': productKey}], attributeColumns: [ { 'lastTime': eventJson.lastTime }, { 'clientIp': eventJson.clientIp }, { 'status': eventJson.status } ], returnContent: { returnType: TableStore.ReturnType.Primarykey } } otsClient.putRow(iot_data, function(err, data) { if (err) { response.body = { msg: 'error', code: 404 }; callback(null, response); return; } response.body = { msg: 'ok', code: 200 }; callback(null, response); return; }); }); } catch (err) { response.body = { msg: 'error', code: 404 }; callback(null, response); } };
2.4 表格存储OTS
在device_status_table表中,查看设备上下线情况: