基于Pub/Sub模式的阿里云IoT同步调用详解

简介: 1.同步调用场景 1.1 背景 MQTT协议是基于PUB/SUB的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。 IoT物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。应用服务器通过POP API发起Rrpc调用,IoT设备端只需要在Timeout内,按照固定的格式回复Pub消息,服务端即可同步获取IoT设备端

1.同步调用场景

1.1 背景

MQTT协议是基于PUB/SUB的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。
IoT物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。应用服务器通过POP API发起Rrpc调用,IoT设备端只需要在Timeout内,按照固定的格式回复Pub消息,服务端即可同步获取IoT设备端的响应结果。
 

具体流程如下:

 

1.2 Topic格式约定

请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}
响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}
$表示变量,每个设备不同
messageId为IoT平台生成的消息ID,设备端回复responseTopic里的messageId要与requestTopic一致

示例:

设备端需要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设备收到Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到消息后,在timeout时间内回复Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534
 
 

2.同步调用RRPC示例

2.1 设备端代码

 
 

const mqtt = require('aliyun-iot-mqtt');
//设备属性
const options = require("./iot-device-config.json");
//建立连接
const client = mqtt.getAliyunIotMqttClient(options);

client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
    
    if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
    	handleRrpc(topic, message)
    }
})

function handleRrpc(topic, message){
	topic = topic.replace('/request/','/response/');
	console.log("topic=" + topic)
	//普通Rrpc,响应payload自定义
    const payloadJson = {code:200,msg:"handle ok"};
	client.publish(topic, JSON.stringify(payloadJson));
}
 

2.2 服务端POP调用Rrpc

 
 
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;

const options = require("./iot-ak-config.json");

//1.初始化client
const client = new RPCClient({
    accessKeyId: options.accessKey,
    secretAccessKey: options.accessKeySecret,
    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
    apiVersion: '2017-04-20'
});

const payload = {
  "msg": "hello Rrpc"
};

//2.构建request
const params = {
    ProductKey:"a1gMu82K4m2",
    DeviceName:"h5@nuwr5r9hf6l@1532088166923",
    RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"),
    Timeout:3000
};

co(function*() {
    //3.发起API调用
    const response = yield client.request('Rrpc', params);

    console.log(JSON.stringify(response));
});
 
rrpc响应:
 
 
{
    "MessageId": "1037292594536681472",
    "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",
    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",
    "Success": true,
    "RrpcCode": "SUCCESS"
}

// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}
 
 

3.物模型-服务同步调用InvokeThingService示例

注意:物模型 服务调用 接口InvokeThingService,不是Rrpc

3.1 物模型-同步服务定义

 

3.2 设备端实现

 
 
 
const mqtt = require('aliyun-iot-mqtt');
//设备属性
const options = require("./iot-device-config.json");
//建立连接
const client = mqtt.getAliyunIotMqttClient(options);

client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
    
    if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
    	handleRrpc(topic, message)
    }
})
/*
* 如果存在多个同步调用服务,需要通过payload里的method区分
*/
function handleRrpc(topic, message){

	topic = topic.replace('/request/','/response/');
	console.log("topic=" + topic)
	//物模型 同步服务调用,响应payload结构:
    const payloadJson = {
        id: Date.now(),
        code:200,
        data: {
            currentMode: Math.floor((Math.random() * 20) + 10)
        }
    }

	client.publish(topic, JSON.stringify(payloadJson));
}
注意:设备端响应的payload要满足物模型定义的出参结构

3.3 服务端POP 接口InvokeThingService

 
 
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;

const options = require("./iot-ak-config.json");

//1.初始化client
const client = new RPCClient({
    accessKeyId: options.accessKey,
    secretAccessKey: options.accessKeySecret,
    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
    apiVersion: '2018-01-20'
});

const params = {
    ProductKey: "a1gMu82K4m2",
    DeviceName: "h5@nuwr5r9hf6l@1532088166923",
    Args: JSON.stringify({ "mode": "1" }),
    Identifier: "thing.service.setMode"
};

co(function*() {
    try {
        //3.发起API调用
        const response = yield client.request('InvokeThingService', params);

        console.log(JSON.stringify(response));
    } catch (err) {
        console.log(err);
    }
});
 
调用结果:
 
 
{
    "Data":{
        "Result": "{\"currentMode\":12}",
        "MessageId": "1536145625658"
    },
    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",
    "Success": true
}
 
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 弹性计算 物联网
【阿里云弹性计算】阿里云ECS在IoT领域的应用:支撑大规模设备连接与数据处理
【5月更文挑战第26天】阿里云ECS是弹性计算服务,支持IoT设备的连接与数据处理。通过MQTT协议实现设备快速接入,配合消息队列处理异构实时数据。ECS可用于部署数据处理工具、应用服务,如智能家居控制系统,通过弹性伸缩适应负载变化。结合阿里云其他服务,ECS为IoT提供完整解决方案,助力企业数字化转型。
102 0
|
传感器 监控 物联网
阿里云IoT HaaS 510:快速实现物联网数据传输的利器
众所周知,物联网(IoT)是近年来日益热门的技术领域之一,它的广泛应用为人们的生活和工作带来了无限可能。在物联网应用中,数据的采集和传输是至关重要的一环。DTU是一种应用于物联网数据传输的终端设备,它可以将各类传感器、数据采集单元等通过串口RS232/485传输到DTU,再由DTU转发到4G网络上传至云端。阿里云IoT HaaS 510是一款开板式DTU产品,能够帮助企业快速搭建物联网平台,并实现数据的采集和传输,那么本文就来简单分享一下。
501 1
阿里云IoT HaaS 510:快速实现物联网数据传输的利器
|
编解码 小程序 JavaScript
阿里云IoT小程序应用开发和组件实践
通过实验,了解阿里云IoT小程序的应用开发的方法,了解其内置的基础组件使用,以及基于Vue.js实现可复用的自定义组件的方法。
|
运维 安全 物联网
使用阿里云 IoT 安全中心保护智慧遥控器
在物联网领域中,我们的 TO B 智慧设备,在发货之后,出现了不少困扰我们的安全问题,比如会被恶意安装应用,访问非法网站等,增加厂家的运维成本。 同时设备上的一些技术机密也容易被好事之人破解,对厂商构成商业损失,直到我们发现了阿里云物联网的一款安全防护产品 -- IoT 安全中心。它主打的 ID² 和安全运营有效的解决了我们的痛点。
504 3
|
自然语言处理 算法 物联网
阿里云正式发布「IoT消费电子应用引擎解决方案」,应用开发提效70%
阿里云正式发布「IoT消费电子应用引擎解决方案」,应用开发提效70%
314 0
|
物联网
《阿里云产品手册2022-2023 版》——阿里云IoT
《阿里云产品手册2022-2023 版》——阿里云IoT
347 0
|
物联网
《阿里云产品手册2022-2023 版》——IoT边缘现场计算:云边协同软件获得首批可信云认证
《阿里云产品手册2022-2023 版》——IoT边缘现场计算:云边协同软件获得首批可信云认证
188 0
|
物联网
《阿里云产品手册2022-2023 版》——IoT 设备身份认证
《阿里云产品手册2022-2023 版》——IoT 设备身份认证
146 0
|
开发框架 物联网 云栖大会
阿里云IoT | HaaS开源百校科技助力计划 —— 开源大使招募
阿里云IoT | HaaS开源百校科技助力计划 —— 开源大使招募
239 0
|
物联网
《阿里云总监课第二期——IoT时代的语音交互智能》电子版地址
阿里云总监课第二期——IoT时代的语音交互智能