生活物联网平台 - 数据AMQP方式推送

简介: AMQP转储功能适用于生活物联网平台与企业服务器之间的消息流转,且为推荐方式。通过集成和使用AMQP SDK,即可实现身份认证、消息接收的能力。我们推荐使用AMQP的方式推送设备数据(如设备状态数据、设备控制记录等),用户信息数据等

Step By Step

  1. 开启设备数据同步开关
  2. 配置AMQP SDK、


一.开启设备数据同步开关

API服务 -- 数据同步

image.pngimage.png

二.配置AMQP SDK

  1. 导入依赖
  2. 更换EndPoint、AppKey和AppSecret;consumerGroupId 与 AppKey 保持一致。
<!--amqp1.0qpidclient--><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.47.0</version></dependency><!--utilforbase64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency>
//EndPoint是连接节点,具体取值如下表所示中国内地:amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671新加坡amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671美国(弗吉尼亚):amqps://ilop.iot-amqp.us-east-1.aliyuncs.com:5671德国(法兰克福):amqps://ilop.iot-amqp.eu-central-1.aliyuncs.com:5671
importjava.net.URI;
importjava.util.Hashtable;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.LinkedBlockingQueue;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
importjavax.crypto.Mac;
importjavax.crypto.spec.SecretKeySpec;
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageListener;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.naming.Context;
importjavax.naming.InitialContext;
importorg.apache.commons.codec.binary.Base64;
importorg.apache.qpid.jms.JmsConnection;
importorg.apache.qpid.jms.JmsConnectionListener;
importorg.apache.qpid.jms.message.JmsInboundMessageDispatch;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
publicclassAmqpJavaClientDemo {
privatefinalstaticLoggerlogger=LoggerFactory.getLogger(AmqpJavaClientDemo.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。privatefinalstaticExecutorServiceexecutorService=newThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() *2, 60, TimeUnit.SECONDS,
newLinkedBlockingQueue<>(50000));
publicstaticvoidmain(String[] args) throwsException {
StringappKey="${YourAppkey}";
StringappSecret="${YourAppSecret}";
StringconsumerGroupId="${YourAppkey}";
longrandom=xxxxx;
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。StringclientId="${YourClientId}";
StringuserName=clientId+"|authMode=appkey"+",signMethod="+"SHA256"+",random="+random+",appKey="+appKey+",groupId="+consumerGroupId+"|";
StringsignContent="random="+random;
Stringpassword=doSign(signContent, appSecret, "HmacSHA256");
StringconnectionUrlTemplate="failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)"+"?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
Hashtable<String, String>hashtable=newHashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrlTemplate);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Contextcontext=newInitialContext(hashtable);
ConnectionFactorycf= (ConnectionFactory)context.lookup("SBCF");
Destinationqueue= (Destination)context.lookup("QUEUE");
// 创建连接。Connectionconnection=cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Sessionsession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。MessageConsumerconsumer=session.createConsumer(queue);
consumer.setMessageListener(messageListener);
    }
privatestaticMessageListenermessageListener=newMessageListener() {
@OverridepublicvoidonMessage(Messagemessage) {
try {
//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() ->processMessage(message));
            } catch (Exceptione) {
logger.error("submit task occurs exception ", e);
            }
        }
    };
/*** 在这里处理您收到消息后的具体业务逻辑。*/privatestaticvoidprocessMessage(Messagemessage) {
try {
byte[] body=message.getBody(byte[].class);
Stringcontent=newString(body);
Stringtopic=message.getStringProperty("topic");
StringmessageId=message.getStringProperty("messageId");
logger.info("receive message"+", topic = "+topic+", messageId = "+messageId+", content = "+content);
        } catch (Exceptione) {
logger.error("processMessage occurs error ", e);
        }
    }
privatestaticJmsConnectionListenermyJmsConnectionListener=newJmsConnectionListener() {
/*** 连接成功建立。*/@OverridepublicvoidonConnectionEstablished(URIremoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }
/*** 尝试过最大重试次数之后,最终连接失败。*/@OverridepublicvoidonConnectionFailure(Throwableerror) {
logger.error("onConnectionFailure, {}", error.getMessage());
        }
/*** 连接中断。*/@OverridepublicvoidonConnectionInterrupted(URIremoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }
/*** 连接中断后又自动重连上。*/@OverridepublicvoidonConnectionRestored(URIremoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }
@OverridepublicvoidonInboundMessage(JmsInboundMessageDispatchenvelope) {}
@OverridepublicvoidonSessionClosed(Sessionsession, Throwablecause) {}
@OverridepublicvoidonConsumerClosed(MessageConsumerconsumer, Throwablecause) {}
@OverridepublicvoidonProducerClosed(MessageProducerproducer, Throwablecause) {}
    };
/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/privatestaticStringdoSign(StringtoSignString, Stringsecret, StringsignMethod) throwsException {
SecretKeySpecsigningKey=newSecretKeySpec(secret.getBytes(), signMethod);
Macmac=Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac=mac.doFinal(toSignString.getBytes());
returnHex.encodeHexString(rawHmac);
    }
}

参考文档:

数据AMQP推送

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
816 1
|
消息中间件 传感器 监控
IoT企业物联网平台,数据服务开发实战
IoT企业物联网平台开发实战
454 0
|
5月前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
149 1
|
24天前
|
SQL 存储 运维
从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理
本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。
40 1
|
2月前
|
传感器 安全 算法
物联网发布者在数据传输过程中如何防止数据被篡改
在物联网数据传输中,为防止数据被篡改,可采用加密技术、数字签名、数据完整性校验等方法,确保数据的完整性和安全性。
|
2月前
|
存储 安全 算法
物联网发布者在发送数据时如何保证数据的安全性和完整性
数据加密、密钥管理和数据完整性验证是物联网安全的重要组成部分。对称加密(如AES)和非对称加密(如RSA)分别适用于大量数据和高安全需求的场景。密钥需安全存储并定期更新。数据完整性通过MAC(如HMAC-SHA256)和数字签名(如RSA签名)验证。通信协议如MQTT over TLS/SSL和CoAP over DTLS增强传输安全,确保数据在传输过程中的机密性和完整性。
|
5月前
|
存储 传感器 监控
理解并利用物联网(IoT)数据的技术探索
【8月更文挑战第11天】物联网数据是数字化转型的重要资源。通过深入理解物联网数据的特性和价值,并采取有效的收集、处理和分析策略,我们可以更好地利用这些数据为企业决策提供支持、优化运营效率、创造新的商业模式并推动数字化转型的深入发展。
|
5月前
|
消息中间件 传感器 监控
AMQP 与物联网 (IoT) 应用的结合
【8月更文第28天】高级消息队列协议 (AMQP) 是一种开放标准的应用层协议,特别适合于物联网 (IoT) 场景中的消息传递。AMQP 提供了可靠的、可扩展的消息传输机制,能够处理来自大量设备的数据流。本文将探讨 AMQP 在 IoT 应用中的优势,并提供使用不同编程语言构建 AMQP 客户端的具体示例。
209 0
|
6月前
|
物联网
好的资源链接,gitee全糖咖啡,B站视频转成mp4,全糖咖啡 / 物联网网关数据上传,,全糖咖啡 / springboot+百度智能车牌检测
好的资源链接,gitee全糖咖啡,B站视频转成mp4,全糖咖啡 / 物联网网关数据上传,,全糖咖啡 / springboot+百度智能车牌检测
|
6月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 物联网平台