基于MQTT接入的设备靠心跳保活,但心跳是周期性的、且自动收发和超时重连,这些特性给主动检测设备端是否在线带来了一定难度。本文提供通过消息收发是否正常判定设备是否在线的原理、流程、实现方式。
如果设备可以发送、接收消息,那么该设备的通信是没问题的,并且一定在线。
消息收发是物联网平台的核心能力。因此,这种判定方法不会因为物联网平台架构升级或业务变动而变化,也不会因为设备使用的客户端不同而不同。是设备端检测自己是否在线最通用的一种原理。
该原理的一种特殊实现就是设备端消息的自发自收。
创建js Topic = /yourProductKey/yourDeviceName/user/checkstatus 。
Topic可以自定义,但权限必须为发布和订阅。
设备端发送消息js {"id":123,"version":"1.0","time":1234567890123} ,
请一定使用QoS=0 。 消息内容可自定义,但建议使用此格式。
参数说明:
字段 | 类型 | 说明 |
---|---|---|
id | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一 |
version | String | 版本号固定1.0 |
time | Long | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量 |
设备端收到消息上一步发送的消息。 离线判定逻辑
严格的:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线
为方便体验,本例基于Java SDK Demo开发,实现设备端检测自己是否在线的严格判定逻辑。 Java SDK开发具体细节,请查看相关文档。
首先,下载Demo工程,添加本类,并填写设备证书信息。设备端代码如下: import java.io.UnsupportedEncodingException;
import com.aliyun.alink.dm.api.DeviceInfo; import com.aliyun.alink.dm.api.InitResult; import com.aliyun.alink.linkkit.api.ILinkKitConnectListener; import com.aliyun.alink.linkkit.api.IoTMqttClientConfig; import com.aliyun.alink.linkkit.api.LinkKit; import com.aliyun.alink.linkkit.api.LinkKitInitParams; import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest; import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest; import com.aliyun.alink.linksdk.cmp.core.base.AMessage; import com.aliyun.alink.linksdk.cmp.core.base.ARequest; import com.aliyun.alink.linksdk.cmp.core.base.AResponse; import com.aliyun.alink.linksdk.cmp.core.base.ConnectState; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener; import com.aliyun.alink.linksdk.tools.AError;
public class CheckDeviceStatusOnDevice {
// ===================需要用户填写的参数,开始===========================
// 产品productKey,设备证书参数之一
private static String productKey = "";
// 设备名字deviceName,设备证书参数之一
private static String deviceName = "";
// 设备密钥deviceSecret,设备证书参数之一
private static String deviceSecret = "";
// 消息通信的Topic,需要在控制台定义,权限必须为发布和订阅
private static String checkStatusTopic = "/" + productKey + "/" + deviceName + "/user/checkstatus";
// ===================需要用户填写的参数结束===========================
// 接收到的消息
private static String subInfo = "";
public static void main(String[] args) throws InterruptedException {
CheckDeviceStatusOnDevice device = new CheckDeviceStatusOnDevice();
// 初始化
device.init(productKey, deviceName, deviceSecret);
// 下行数据监听
device.registerNotifyListener();
// 订阅Topic
device.subscribe(checkStatusTopic);
// 测试设备状态
System.out.println("we will check device online status now.");
device.checkStatus();
// 准备测试设备离线状态,请拔掉网线
System.out.println("pls close network,we will check device offline status after 60 seconds.");
for (int i = 0; i < 6; i++) {
Thread.sleep(10000);
}
device.checkStatus();
}
/**
* 测试设备状态
*
* @throws InterruptedException
*/
public void checkStatus() throws InterruptedException {
// -------------------------------------------------------------------
// 要发送的消息,可以自定义,建议使用当前格式
// -------------------------------------------------------------------
// Field | Tyep | Desc
// -------------------------------------------------------------------
// id | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一
// -------------------------------------------------------------------
// version | String | 版本号固定1.0
// -------------------------------------------------------------------
// time | Long | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
// -------------------------------------------------------------------
String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";
// 发送消息
publish(checkStatusTopic, payload);
// 严格的离线判定逻辑:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线
boolean isTimeout = true;
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
if (!subInfo.isEmpty()) {
isTimeout = false;
break;
}
}
if (!isTimeout && payload.equals(subInfo)) {
System.out.println("Device is online !!");
} else {
System.out.println("Device is offline !!");
}
// 置空接收到的消息,方便下一次测试
subInfo = "";
}
/**
* 初始化
*
* @param pk productKey
* @param dn devcieName
* @param ds deviceSecret
* @throws InterruptedException
*/
public void init(String pk, String dn, String ds) throws InterruptedException {
LinkKitInitParams params = new LinkKitInitParams();
// 设置 MQTT 初始化参数
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = pk;
config.deviceName = dn;
config.deviceSecret = ds;
params.mqttClientConfig = config;
// 设置初始化设备证书信息,用户传入
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = pk;
deviceInfo.deviceName = dn;
deviceInfo.deviceSecret = ds;
params.deviceInfo = deviceInfo;
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
@Override
public void onInitDone(InitResult initResult) {
System.out.println("init success !!");
}
@Override
public void onError(AError aError) {
System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
+ aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
}
});
// 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
Thread.sleep(2000);
}
/**
* 监听下行数据
*/
public void registerNotifyListener() {
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public boolean shouldHandle(String connectId, String topic) {
// 只处理特定Topic的消息
if (checkStatusTopic.equals(topic)) {
return true;
} else {
return false;
}
}
@Override
public void onNotify(String connectId, String topic, AMessage aMessage) {
// 接收消息
try {
subInfo = new String((byte[]) aMessage.getData(), "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void onConnectStateChange(String connectId, ConnectState connectState) {
}
});
}
/**
* 发布消息
*
* @param topic 发送消息的Topic
* @param payload 发送的消息内容
*/
public void publish(String topic, String payload) {
MqttPublishRequest request = new MqttPublishRequest();
request.topic = topic;
request.payloadObj = payload;
request.qos = 0;
LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
@Override
public void onResponse(ARequest aRequest, AResponse aResponse) {
}
@Override
public void onFailure(ARequest aRequest, AError aError) {
}
});
}
/**
* 订阅消息
*
* @param topic 订阅消息的Topic
*/
public void subscribe(String topic) {
MqttSubscribeRequest request = new MqttSubscribeRequest();
request.topic = topic;
LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(AError aError) {
}
});
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。