登录物联网控制台,创建产品和设备。
左侧菜单栏选择设备管理 > 产品,单击创建产品,创建产品。本示例中,创建了名称为MQ_test的产品,且节点类型选择为设备。
在产品详情页面,自定义Topic类,用于设备上报数据。本示例中,定义的Topic类:/{YourProductKey}/${YourDeviceName}/user/data。
选择设备管理 > 设备 > 添加设备,创建设备。本示例中,创建了一个名称为MQdevice的设备。
在消息队列RocketMQ控制台,创建Topic和消费者。
登录消息队列RocketMQ控制台。
创建一个实例。
创建Topic。消息类型选择普通消息。
物联网设备消息
创建Group ID。
物联网设备消息
创建消息消费者。在服务器SDK中运行如下示例代码,然后,在RocketMQ控制台查看消费者状态,消费者是否处于在线状态,订阅关系是否一致。
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, "${AccessKey}");
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "${SecretKey}");
// 设置 TCP 接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
// 集群订阅方式 (默认)
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 广播订阅方式
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("iot_to_mq", "*", new MessageListener() { //订阅多个 Tag
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
说明
如何创建AccessKey和SecretKey,请参见创建AccessKey。
RocketMQ详细操作指导,请参考消息队列RocketMQ文档
在物联网平台控制台,设置数据流转规则,将设备上报的数据转发至消息队列(RocketMQ)。
单击规则引擎 > 创建规则,创建一条数据流转规则。数据格式选择为JSON。
设置数据处理SQL。
物联网设备消息
设置数据转发目的地。
物联网设备消息
启动规则。
规则启动后,物联网平台会将规则SQL中定义的设备上报消息转发至消息队列(RocketMQ)的Topic中。
使用Java SDK模拟设备,上报消息。
下载Java SDK Demo。
输入MQdevice的设备证书信息,包括ProductKey、DeviceName和DeviceSecret。
修改MQTT Topic为设备上报数据的Topic。本示例中,使用的Topic是/{YourProductKey}/${YourDeviceName}/user/data。
启动设备。
在物联网平台控制台监控运维 > 日志服务中,查看该设备的日志信息,发现设备数据成功转发至RocketMQ。 物联网设备消息 在RocketMQ控制台查看消息。
在本地运行订阅消息队列RocketMQ资源的代码。
物联网设备消息
在消息队列RocketMQ控制台,消息查询页面,按Topic或者Message ID查询消息,验证消息是否成功流转至消息队列RocketMQ中。
RocketMQ接收到的消息类型:
{"deviceName()":"MQdevice"}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。