生活物联网平台 - 数据AMQP方式推送

简介: AMQP转储功能适用于生活物联网平台与企业服务器之间的消息流转,且为推荐方式。通过集成和使用AMQP SDK,即可实现身份认证、消息接收的能力。我们推荐使用AMQP的方式推送设备数据(如设备状态数据、设备控制记录等),用户信息数据等

Step By Step

  1. 开启设备数据同步开关
  2. 配置AMQP SDK、


一.开启设备数据同步开关

API服务 -- 数据同步

image.pngimage.png

二.配置AMQP SDK

  1. 导入依赖
  2. 更换EndPoint、AppKey和AppSecret;consumerGroupId 与 AppKey 保持一致。
<!--amqp1.0qpidclient--><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.47.0</version></dependency><!--utilforbase64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency>
//EndPoint是连接节点,具体取值如下表所示中国内地:amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671新加坡amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671美国(弗吉尼亚):amqps://ilop.iot-amqp.us-east-1.aliyuncs.com:5671德国(法兰克福):amqps://ilop.iot-amqp.eu-central-1.aliyuncs.com:5671
importjava.net.URI;
importjava.util.Hashtable;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.LinkedBlockingQueue;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
importjavax.crypto.Mac;
importjavax.crypto.spec.SecretKeySpec;
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageListener;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.naming.Context;
importjavax.naming.InitialContext;
importorg.apache.commons.codec.binary.Base64;
importorg.apache.qpid.jms.JmsConnection;
importorg.apache.qpid.jms.JmsConnectionListener;
importorg.apache.qpid.jms.message.JmsInboundMessageDispatch;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
publicclassAmqpJavaClientDemo {
privatefinalstaticLoggerlogger=LoggerFactory.getLogger(AmqpJavaClientDemo.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。privatefinalstaticExecutorServiceexecutorService=newThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() *2, 60, TimeUnit.SECONDS,
newLinkedBlockingQueue<>(50000));
publicstaticvoidmain(String[] args) throwsException {
StringappKey="${YourAppkey}";
StringappSecret="${YourAppSecret}";
StringconsumerGroupId="${YourAppkey}";
longrandom=xxxxx;
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。StringclientId="${YourClientId}";
StringuserName=clientId+"|authMode=appkey"+",signMethod="+"SHA256"+",random="+random+",appKey="+appKey+",groupId="+consumerGroupId+"|";
StringsignContent="random="+random;
Stringpassword=doSign(signContent, appSecret, "HmacSHA256");
StringconnectionUrlTemplate="failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)"+"?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
Hashtable<String, String>hashtable=newHashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrlTemplate);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Contextcontext=newInitialContext(hashtable);
ConnectionFactorycf= (ConnectionFactory)context.lookup("SBCF");
Destinationqueue= (Destination)context.lookup("QUEUE");
// 创建连接。Connectionconnection=cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Sessionsession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。MessageConsumerconsumer=session.createConsumer(queue);
consumer.setMessageListener(messageListener);
    }
privatestaticMessageListenermessageListener=newMessageListener() {
@OverridepublicvoidonMessage(Messagemessage) {
try {
//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() ->processMessage(message));
            } catch (Exceptione) {
logger.error("submit task occurs exception ", e);
            }
        }
    };
/*** 在这里处理您收到消息后的具体业务逻辑。*/privatestaticvoidprocessMessage(Messagemessage) {
try {
byte[] body=message.getBody(byte[].class);
Stringcontent=newString(body);
Stringtopic=message.getStringProperty("topic");
StringmessageId=message.getStringProperty("messageId");
logger.info("receive message"+", topic = "+topic+", messageId = "+messageId+", content = "+content);
        } catch (Exceptione) {
logger.error("processMessage occurs error ", e);
        }
    }
privatestaticJmsConnectionListenermyJmsConnectionListener=newJmsConnectionListener() {
/*** 连接成功建立。*/@OverridepublicvoidonConnectionEstablished(URIremoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }
/*** 尝试过最大重试次数之后,最终连接失败。*/@OverridepublicvoidonConnectionFailure(Throwableerror) {
logger.error("onConnectionFailure, {}", error.getMessage());
        }
/*** 连接中断。*/@OverridepublicvoidonConnectionInterrupted(URIremoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }
/*** 连接中断后又自动重连上。*/@OverridepublicvoidonConnectionRestored(URIremoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }
@OverridepublicvoidonInboundMessage(JmsInboundMessageDispatchenvelope) {}
@OverridepublicvoidonSessionClosed(Sessionsession, Throwablecause) {}
@OverridepublicvoidonConsumerClosed(MessageConsumerconsumer, Throwablecause) {}
@OverridepublicvoidonProducerClosed(MessageProducerproducer, Throwablecause) {}
    };
/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/privatestaticStringdoSign(StringtoSignString, Stringsecret, StringsignMethod) throwsException {
SecretKeySpecsigningKey=newSecretKeySpec(secret.getBytes(), signMethod);
Macmac=Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac=mac.doFinal(toSignString.getBytes());
returnHex.encodeHexString(rawHmac);
    }
}

参考文档:

数据AMQP推送

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
11月前
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
690 1
|
11月前
|
消息中间件 传感器 监控
IoT企业物联网平台,数据服务开发实战
IoT企业物联网平台开发实战
378 0
|
9天前
|
存储 传感器 监控
理解并利用物联网(IoT)数据的技术探索
【8月更文挑战第11天】物联网数据是数字化转型的重要资源。通过深入理解物联网数据的特性和价值,并采取有效的收集、处理和分析策略,我们可以更好地利用这些数据为企业决策提供支持、优化运营效率、创造新的商业模式并推动数字化转型的深入发展。
|
21天前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
物联网
好的资源链接,gitee全糖咖啡,B站视频转成mp4,全糖咖啡 / 物联网网关数据上传,,全糖咖啡 / springboot+百度智能车牌检测
好的资源链接,gitee全糖咖啡,B站视频转成mp4,全糖咖啡 / 物联网网关数据上传,,全糖咖啡 / springboot+百度智能车牌检测
|
2月前
|
机器学习/深度学习 传感器 算法
物联网(IoT)数据与机器学习的结合
【6月更文挑战第6天】物联网和机器学习加速融合,驱动数据收集与智能分析。通过机器学习算法处理 IoT 数据,实现智能家居、工业生产的智能化。示例代码展示如何用线性回归预测温度。结合带来的优势包括实时监测、预警、资源优化,但也面临数据质量、隐私安全、算法选择等挑战。未来需强化技术创新,应对挑战,推动社会智能化发展。
97 0
|
2月前
|
存储 安全 算法
物联网中的数据加密技术
【6月更文挑战第1天】物联网中的数据加密技术
312 0
|
3月前
|
数据采集 SQL Oracle
助力工业物联网,工业大数据之DWD层构建:数据抽取分析【十一】
助力工业物联网,工业大数据之DWD层构建:数据抽取分析【十一】
72 0
|
3月前
|
人工智能 数据可视化 安全
Java带可视化数据大屏的物联网智慧工地系统源码
通过现场AI智能视频监控、临时设施动态管理,实时检测场地空间、资源、设施的运行状况,及时发现场地安全隐患,确保为工人营造一个安全、文明的场地作业环境。
79 0
|
9月前
|
存储 安全 物联网安全
物联网安全数据泄漏如何防范
物联网安全数据泄漏如何防范
145 0

相关产品

  • 物联网平台