生活物联网平台 - 数据AMQP方式推送

简介: AMQP转储功能适用于生活物联网平台与企业服务器之间的消息流转,且为推荐方式。通过集成和使用AMQP SDK,即可实现身份认证、消息接收的能力。我们推荐使用AMQP的方式推送设备数据(如设备状态数据、设备控制记录等),用户信息数据等

Step By Step

  1. 开启设备数据同步开关
  2. 配置AMQP SDK、


一.开启设备数据同步开关

API服务 -- 数据同步

image.pngimage.png

二.配置AMQP SDK

  1. 导入依赖
  2. 更换EndPoint、AppKey和AppSecret;consumerGroupId 与 AppKey 保持一致。
<!--amqp1.0qpidclient--><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.47.0</version></dependency><!--utilforbase64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency>
//EndPoint是连接节点,具体取值如下表所示中国内地:amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671新加坡amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671美国(弗吉尼亚):amqps://ilop.iot-amqp.us-east-1.aliyuncs.com:5671德国(法兰克福):amqps://ilop.iot-amqp.eu-central-1.aliyuncs.com:5671
importjava.net.URI;
importjava.util.Hashtable;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.LinkedBlockingQueue;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
importjavax.crypto.Mac;
importjavax.crypto.spec.SecretKeySpec;
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageListener;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.naming.Context;
importjavax.naming.InitialContext;
importorg.apache.commons.codec.binary.Base64;
importorg.apache.qpid.jms.JmsConnection;
importorg.apache.qpid.jms.JmsConnectionListener;
importorg.apache.qpid.jms.message.JmsInboundMessageDispatch;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
publicclassAmqpJavaClientDemo {
privatefinalstaticLoggerlogger=LoggerFactory.getLogger(AmqpJavaClientDemo.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。privatefinalstaticExecutorServiceexecutorService=newThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() *2, 60, TimeUnit.SECONDS,
newLinkedBlockingQueue<>(50000));
publicstaticvoidmain(String[] args) throwsException {
StringappKey="${YourAppkey}";
StringappSecret="${YourAppSecret}";
StringconsumerGroupId="${YourAppkey}";
longrandom=xxxxx;
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。StringclientId="${YourClientId}";
StringuserName=clientId+"|authMode=appkey"+",signMethod="+"SHA256"+",random="+random+",appKey="+appKey+",groupId="+consumerGroupId+"|";
StringsignContent="random="+random;
Stringpassword=doSign(signContent, appSecret, "HmacSHA256");
StringconnectionUrlTemplate="failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)"+"?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
Hashtable<String, String>hashtable=newHashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrlTemplate);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Contextcontext=newInitialContext(hashtable);
ConnectionFactorycf= (ConnectionFactory)context.lookup("SBCF");
Destinationqueue= (Destination)context.lookup("QUEUE");
// 创建连接。Connectionconnection=cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Sessionsession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。MessageConsumerconsumer=session.createConsumer(queue);
consumer.setMessageListener(messageListener);
    }
privatestaticMessageListenermessageListener=newMessageListener() {
@OverridepublicvoidonMessage(Messagemessage) {
try {
//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() ->processMessage(message));
            } catch (Exceptione) {
logger.error("submit task occurs exception ", e);
            }
        }
    };
/*** 在这里处理您收到消息后的具体业务逻辑。*/privatestaticvoidprocessMessage(Messagemessage) {
try {
byte[] body=message.getBody(byte[].class);
Stringcontent=newString(body);
Stringtopic=message.getStringProperty("topic");
StringmessageId=message.getStringProperty("messageId");
logger.info("receive message"+", topic = "+topic+", messageId = "+messageId+", content = "+content);
        } catch (Exceptione) {
logger.error("processMessage occurs error ", e);
        }
    }
privatestaticJmsConnectionListenermyJmsConnectionListener=newJmsConnectionListener() {
/*** 连接成功建立。*/@OverridepublicvoidonConnectionEstablished(URIremoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }
/*** 尝试过最大重试次数之后,最终连接失败。*/@OverridepublicvoidonConnectionFailure(Throwableerror) {
logger.error("onConnectionFailure, {}", error.getMessage());
        }
/*** 连接中断。*/@OverridepublicvoidonConnectionInterrupted(URIremoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }
/*** 连接中断后又自动重连上。*/@OverridepublicvoidonConnectionRestored(URIremoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }
@OverridepublicvoidonInboundMessage(JmsInboundMessageDispatchenvelope) {}
@OverridepublicvoidonSessionClosed(Sessionsession, Throwablecause) {}
@OverridepublicvoidonConsumerClosed(MessageConsumerconsumer, Throwablecause) {}
@OverridepublicvoidonProducerClosed(MessageProducerproducer, Throwablecause) {}
    };
/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/privatestaticStringdoSign(StringtoSignString, Stringsecret, StringsignMethod) throwsException {
SecretKeySpecsigningKey=newSecretKeySpec(secret.getBytes(), signMethod);
Macmac=Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac=mac.doFinal(toSignString.getBytes());
returnHex.encodeHexString(rawHmac);
    }
}

参考文档:

数据AMQP推送

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
8月前
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
616 1
|
5月前
|
数据采集 SQL Oracle
助力工业物联网,工业大数据之DWD层构建:数据抽取分析【十一】
助力工业物联网,工业大数据之DWD层构建:数据抽取分析【十一】
54 0
|
5月前
|
人工智能 数据可视化 安全
Java带可视化数据大屏的物联网智慧工地系统源码
通过现场AI智能视频监控、临时设施动态管理,实时检测场地空间、资源、设施的运行状况,及时发现场地安全隐患,确保为工人营造一个安全、文明的场地作业环境。
62 0
|
6月前
|
存储 安全 物联网安全
物联网安全数据泄漏如何防范
物联网安全数据泄漏如何防范
107 0
|
9月前
|
物联网 数据处理 数据安全/隐私保护
物联网隐私保护:守护用户数据的数字隐私
本篇深入探讨了物联网中的隐私保护问题,包括用户数据隐私权利,匿名化与数据脱敏,以及法规与合规性。我们强调了保护用户数据隐私的重要性,并通过代码示例演示了如何使用加密库保护用户敏感数据。此外,我们介绍了匿名化和数据脱敏的概念,以及如何在数据处理中实施这些技术。最后,我们讨论了隐私保护法规,如GDPR和CCPA,以及确保物联网应用合规性的方法。通过这些内容,读者将更深入了解如何在物联网环境中有效地保护用户隐私,为构建可信赖的物联网生态系统提供了有益的指导。
194 0
EMQ
|
9月前
|
存储 监控 网络协议
工业物联网数据桥接教程:Modbus 桥接到 MQTT
通过将 Modbus RTU 或 TCP 转换为 MQTT 消息,可以轻松地将设备数据发送到云端,并在需要时进行远程控制和监控。
EMQ
527 0
工业物联网数据桥接教程:Modbus 桥接到 MQTT
|
物联网 数据管理 数据挖掘
阿里云物联网数据的统计分析(三)|学习笔记
快速学习阿里云物联网数据的统计分析(三)
332 0
阿里云物联网数据的统计分析(三)|学习笔记
|
11月前
|
存储 人工智能 达摩院
带你读《云存储应用白皮书》之29:2. 物联网大数据存储解决方案
带你读《云存储应用白皮书》之29:2. 物联网大数据存储解决方案
275 1
|
12月前
|
存储 JavaScript 前端开发
TDengine极简实战:从采集到入库,从前端到后端,体验物联网设备数据流转
TDengine极简实战:从采集到入库,从前端到后端,体验物联网设备数据流转
1071 1
|
12月前
|
消息中间件 网络协议 物联网
「物联网架构」HiveMQ和Apache Kafka流式处理IoT数据和MQTT消息
「物联网架构」HiveMQ和Apache Kafka流式处理IoT数据和MQTT消息

相关产品

  • 物联网平台