开发者社区> 问答> 正文

服务端检测设备是否在线

基于MQTT接入的设备靠心跳保活,但心跳具有周期性、自动收发包和超时重连等特性,这些特性给主动检测设备端是否在线带来了一定难度。服务端虽提供了GetDeviceStatus和BatchGetDeviceState接口查询设备状态,但是调用API获取设备状态是基于会话实现的,会话保活是建立在心跳基础上的。本文提供通过使用RRPC来判定设备是否在线的原理、流程、实现方式。

展开
收起
请回答1024 2020-03-05 16:56:38 751 0
1 条回答
写回答
取消 提交回答
  • 原理

    如果设备可以接收服务端下发的消息并作出响应,则该设备通信正常,设备一定在线。

    消息收发是物联网平台的核心能力。因此,这种判定方法不会因为物联网平台架构升级或业务变动而变化,也不会因为设备使用的客户端不同而不同。这是服务端检测设备是否在线最通用的一种原理。

    物联网平台提供的RRPC能力就是该原理的一种特殊实现。

    流程

    1. 设备端订阅RRPC请求的Topic是/sys/${yourProductKey}/${yourDeviceName}/rrpc/request/+。
    2. 服务端调用RRpc接口发送指令,例如{"id":123,"version":"1.0","time":1234567890123}。 消息内容可自定义,但建议使用此格式。

      参数说明如下表。

    字段类新说明
    idObject消息ID,用于验证收发的消息是否是同一条。请自行在业务层生成,并保证其唯一性。
    versionString版本号。目前版本号固定为1.0。
    timeLong发送消息的时间戳,可以用于计算消息来回的延时,评估当前的通信质量。
    1. 设备端接收指令并响应RRPC请求。接收的消息格式:{"id":123,"version":"1.0","time":1234567890123} 离线判定逻辑如下。

      • 严格:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线。
      • 普通:发送消息后,5秒内没有收到消息算失败,连续2次失败,判定为离线。
      • 宽松:发送消息后,5秒内没有收到消息算失败,连续3次失败,判定为离线。

      说明1.PNG

    实现

    为方便体验,本例基于设备端Java SDK Demo和服务端Java SDK Demo开发。

    说明2.PNG

    分别下载Demo工程,服务端添加CheckDeviceStatusOnServer类,设备端添加Device类,填写您的阿里云账号AccessKey信息和设备证书信息。

    设备端代码如下: import java.io.UnsupportedEncodingException;

    import com.aliyun.alink.dm.api.DeviceInfo; import com.aliyun.alink.dm.api.InitResult; import com.aliyun.alink.linkkit.api.ILinkKitConnectListener; import com.aliyun.alink.linkkit.api.IoTMqttClientConfig; import com.aliyun.alink.linkkit.api.LinkKit; import com.aliyun.alink.linkkit.api.LinkKitInitParams; import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest; import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest; import com.aliyun.alink.linksdk.cmp.core.base.AMessage; import com.aliyun.alink.linksdk.cmp.core.base.ARequest; import com.aliyun.alink.linksdk.cmp.core.base.AResponse; import com.aliyun.alink.linksdk.cmp.core.base.ConnectState; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener; import com.aliyun.alink.linksdk.tools.AError;

    public class Device {

    // ===================需要用户填写的参数,开始===========================
    // 产品productKey,设备证书参数之一
    private static String productKey = "";
    // 设备名字deviceName,设备证书参数之一
    private static String deviceName = "";
    // 设备密钥deviceSecret,设备证书参数之一
    private static String deviceSecret = "";
    // 消息通信的Topic,无需创建和定义,直接使用即可
    private static String rrpcTopic = "/sys/" + productKey + "/" + deviceName + "/rrpc/request/+";
    // ===================需要用户填写的参数,结束===========================
    
    public static void main(String[] args) throws InterruptedException {
    
        Device device = new Device();
    
        // 初始化
        device.init(productKey, deviceName, deviceSecret);
    
        // 下行数据监听
        device.registerNotifyListener();
    
        // 订阅Topic
        device.subscribe(rrpcTopic);
    }
    
    /**
     * 初始化
     * 
     * @param pk productKey
     * @param dn devcieName
     * @param ds deviceSecret
     * @throws InterruptedException
     */
    public void init(String pk, String dn, String ds) throws InterruptedException {
    
        LinkKitInitParams params = new LinkKitInitParams();
    
        // 设置 MQTT 初始化参数
        IoTMqttClientConfig config = new IoTMqttClientConfig();
        config.productKey = pk;
        config.deviceName = dn;
        config.deviceSecret = ds;
        params.mqttClientConfig = config;
    
        // 设置初始化设备证书信息,用户传入
        DeviceInfo deviceInfo = new DeviceInfo();
        deviceInfo.productKey = pk;
        deviceInfo.deviceName = dn;
        deviceInfo.deviceSecret = ds;
    
        params.deviceInfo = deviceInfo;
    
        LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
            public void onError(AError aError) {
                System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
                        + aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
            }
    
            public void onInitDone(InitResult initResult) {
                System.out.println("init success !!");
            }
        });
    
        // 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
        Thread.sleep(2000);
    }
    
    /**
     * 监听下行数据
     */
    public void registerNotifyListener() {
        LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
            @Override
            public boolean shouldHandle(String connectId, String topic) {
                // 只处理特定Topic的消息
                if (topic.contains("/rrpc/request/")) {
                    return true;
                } else {
                    return false;
                }
            }
    
            @Override
            public void onNotify(String connectId, String topic, AMessage aMessage) {
                // 接收RRPC请求并回复RRPC响应
                try {
                    String response = topic.replace("/request/", "/response/");
                    publish(response, new String((byte[]) aMessage.getData(), "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void onConnectStateChange(String connectId, ConnectState connectState) {
            }
        });
    }
    
    /**
     * 发布消息
     * 
     * @param topic 发送消息的Topic
     * @param payload 发送的消息内容
     */
    public void publish(String topic, String payload) {
        MqttPublishRequest request = new MqttPublishRequest();
        request.topic = topic;
        request.payloadObj = payload;
        request.qos = 0;
        LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
            @Override
            public void onResponse(ARequest aRequest, AResponse aResponse) {
            }
    
            @Override
            public void onFailure(ARequest aRequest, AError aError) {
            }
        });
    }
    
    /**
     * 订阅消息
     * 
     * @param topic 订阅消息的Topic
     */
    public void subscribe(String topic) {
        MqttSubscribeRequest request = new MqttSubscribeRequest();
        request.topic = topic;
        LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
            @Override
            public void onSuccess() {
            }
    
            @Override
            public void onFailure(AError aError) {
            }
        });
    }
    

    }

    服务端代码如下: import java.io.UnsupportedEncodingException;

    import org.apache.commons.codec.binary.Base64;

    import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.iot.model.v20170420.RRpcRequest; import com.aliyuncs.iot.model.v20170420.RRpcResponse; import com.aliyuncs.profile.DefaultProfile; import com.aliyuncs.profile.IClientProfile;

    public class CheckDeviceStatusOnServer extends BaseTest {

    // ===================需要用户填写的参数,开始===========================
    // 用户账号AccessKey
    private static String accessKeyID = "";
    // 用户账号AccesseKeySecret
    private static String accessKeySecret = "";
    // 产品productKey,设备证书参数之一
    private static String productKey = "";
    // 设备名字deviceName,设备证书参数之一
    private static String deviceName = "";
    // ===================需要用户填写的参数,结束===========================
    
    public static void main(String[] args) throws ServerException, ClientException, UnsupportedEncodingException {
    
        // -------------------------------------------------------------------
        // 要发送的消息,可以自定义,建议使用当前格式
        // -------------------------------------------------------------------
        // Field   | Tyep   | Desc
        // -------------------------------------------------------------------
        // id      | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一
        // -------------------------------------------------------------------
        // version | String | 版本号固定1.0
        // -------------------------------------------------------------------
        // time    | Long   | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
        // -------------------------------------------------------------------
        String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";
    
        // 构建RRPC请求
        RRpcRequest request = new RRpcRequest();
        request.setProductKey(productKey);
        request.setDeviceName(deviceName);
        request.setRequestBase64Byte(Base64.encodeBase64String(payload.getBytes()));
        request.setTimeout(5000);
    
        // 获取服务端请求客户端
        DefaultAcsClient client = getClient();
    
        // 发起RRPC请求
        RRpcResponse response = (RRpcResponse) client.getAcsResponse(request);
    
        // RRPC响应处理
        // 这个不能看response.getSuccess(),这个仅表明RRPC请求发送成功,不代表设备接收成功和响应成功
        // 需要根据RrpcCode来判定,参考文档https://help.aliyun.com/document_detail/69797.html
        if (response != null && "SUCCESS".equals(response.getRrpcCode())) {
            if (payload.equals(new String(Base64.decodeBase64(response.getPayloadBase64Byte()), "UTF-8"))) {
                System.out.println("Device is online");
            } else {
                System.out.println("Device is offline1");
            }
        } else {
            System.out.println("Device is offline");
        }
    }
    
    public static DefaultAcsClient getClient() {
    
        DefaultAcsClient client = null;
    
        try {
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKeyID, accessKeySecret);
            DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
            client = new DefaultAcsClient(profile);
        } catch (Exception e) {
            System.out.println("init client failed !! exception:" + e.getMessage());
        }
    
        return client;
    }
    

    }

    说明3.PNG

    2020-03-05 17:07:48
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
连接管理平台:为按需安全智能的连接服务而来 立即下载
内容驱动游戏分发 立即下载
内容安全检测与管控 立即下载