阿里云物联网平台AMQP服务端订阅NetSDK Demo

简介: 服务端可以直接订阅产品下所有类型的消息:设备上报消息、设备状态变化通知、网关发现子设备上报、设备生命周期变更、设备拓扑关系变更。配置服务端订阅后,物联网平台会将产品下所有设备的已订阅类型的消息转发至您的服务端。本文主要演示如果使用NET SDK进行AMQP服务端订阅。

Step By Step

1、相关参数获取:

参考链接:阿里云物联网平台AMQP服务端订阅NodeJS Demo

2、SDK:AMQPNetLite 安装:
图片.png

3、Code Sample

using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;

namespace IOTAmqpDemo
{
    class Program
    {
        //接入域名,请参见AMQP客户端接入说明文档。
       // 注意不用带 amqp://
        static string Host = "18482178********.iot-amqp.cn-shanghai.aliyuncs.com";
        static int Port = 5671;
        static string AccessKey = "LTAIOZZg********";
        static string AccessSecret = "v7CjUJCMk7j9aK****************";
        static string consumerGroupId = "R45CIUGUE3****************";
        static string clientId = "demoClientID";
        //iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
        static string iotInstanceId = "";
        static int Count = 0;
        static int IntervalTime = 10000;

        static Address address;
        static void Main(string[] args)
        {
            long timestamp = GetCurrentMilliseconds();
            string param = "authId=" + AccessKey + "&timestamp=" + timestamp;
            //userName组装方法,请参见AMQP客户端接入说明文档。
            string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
               + ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
            string password = doSign(param, AccessSecret, "HmacMD5");

            DoConnectAmqp(userName, password);

            ManualResetEvent resetEvent = new ManualResetEvent(false);
            resetEvent.WaitOne();
        }

        static void DoConnectAmqp(string userName, string password)
        {
            address = new Address(Host, Port, userName, password);
            //Create Connection
            ConnectionFactory cf = new ConnectionFactory();
            //use local tls if neccessary
            //cf.SSL.ClientCertificates.Add(GetCert());
            //cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
            cf.SASL.Profile = SaslProfile.External;
            cf.AMQP.IdleTimeout = 120000;
            //cf.AMQP.
            //cf.AMQP.ContainerId、cf.AMQP.HostName请自定义。
            cf.AMQP.ContainerId = "client.1.2";
            cf.AMQP.HostName = "contoso.com";

            cf.AMQP.MaxFrameSize = 8 * 1024;
            var connection = cf.CreateAsync(address).Result;

            //Connection Exception Closed
            connection.AddClosedCallback(ConnClosed);

            //Receive Message
            DoReceive(connection);
        }

        static void DoReceive(Connection connection)
        {
            //Create Session
            var session = new Session(connection);

            //Create Link and Receive Message
            var receiver = new ReceiverLink(session, "queueName", null);


            receiver.Start(20, (link, message) =>
            {
                object messageId = message.ApplicationProperties["messageId"];
                object topic = message.ApplicationProperties["topic"];
                string body = Encoding.UTF8.GetString((Byte[])message.Body);
                //注意:此处不要有耗时的逻辑,如果这里要进行业务处理,请另开线程,否则会堵塞消费。如果消费一直延时,会增加消息重发的概率。
                Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);

                //Acknowledge Message
                link.Accept(message);
            });
        }

        //连接发生异常后,进入重连模式。
        //这里只是一个简单重试的示例,您可以采用指数退避方式,来完善异常场景,重连策略。
        static void ConnClosed(IAmqpObject _, Error e)
        {
            Console.WriteLine("ocurr error: " + e);
            if (Count < 3)
            {
                Count += 1;
                Thread.Sleep(IntervalTime * Count);
            }
            else
            {
                Thread.Sleep(120000);
            }

            //Reconnection
            DoConnectAmqp(address.User, address.Password);
        }

        static X509Certificate GetCert()
        {
            string certPath = Environment.CurrentDirectory + "/root.crt";
            X509Certificate crt = new X509Certificate(certPath);

            return crt;
        }

        static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            return true;
        }

        static long GetCurrentMilliseconds()
        {
            DateTime dt1970 = new DateTime(1970, 1, 1);
            DateTime current = DateTime.Now;
            return (long)(current - dt1970).TotalMilliseconds;
        }

        //签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        static string doSign(string param, string accessSecret, string signMethod)
        {
            //signMethod = HmacMD5
            byte[] key = Encoding.UTF8.GetBytes(accessSecret);
            byte[] signContent = Encoding.UTF8.GetBytes(param);
            var hmac = new HMACMD5(key);
            byte[] hashBytes = hmac.ComputeHash(signContent);
            return Convert.ToBase64String(hashBytes);
        }
    }
}

4、测试效果

  • 4.1 设备上报消息
    图片.png
  • 4.2 服务端订阅
    图片.png

更多参考

NET SDK接入示例
阿里云物联网平台AMQP服务端订阅NodeJS Demo

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
2月前
|
消息中间件 DataWorks 物联网
MQTT问题之接入阿里云物联网平台如何解决
MQTT接入是指将设备或应用通过MQTT协议接入到消息服务器,以实现数据的发布和订阅;本合集着眼于MQTT接入的流程、配置指导以及常见接入问题的解决方法,帮助用户实现稳定可靠的消息交换。
260 1
|
2月前
|
Cloud Native 安全 物联网
【阿里云云原生专栏】云边端一体化:阿里云如何利用云原生技术赋能物联网
【5月更文挑战第22天】阿里云借助云原生技术赋能物联网,实现云边端一体化,提升系统弹性与敏捷性。通过容器化部署,保证高可用性与可靠性。在智能交通等领域,阿里云提供高效解决方案,实现实时数据分析与决策。代码示例展示如何使用阿里云服务处理物联网数据。同时,阿里云重视数据安全,采用加密和访问控制保障数据隐私。丰富的工具和服务支持开发者构建物联网应用,推动技术广泛应用与发展。
198 1
|
2月前
|
消息中间件 存储 JavaScript
阿里云IOC物联网异步处理基础概念
该内容介绍了异步处理基础和消息队列的相关概念。首先,同步处理指任务完成后才能执行其他操作,而异步则允许任务并行执行,不阻塞程序。异步能提高系统并发性和响应性,但也增加复杂性和资源消耗。接着,提到了消息队列,包括生产者、消费者、队列、broker和topic等概念,并指出在IoT中,设备作为生产者发送消息到特定topic,消费者从队列获取数据。最后,简要介绍了AMQP协议,它是用于应用程序间消息传递的开放标准,常用于分布式系统和物联网,如RabbitMQ和Apache Qpid。课程将以Apache Qpid为例接收IoT数据。
156 6
阿里云IOC物联网异步处理基础概念
|
2月前
【开源视频联动物联网平台】vertx写一个mqtt服务端
【开源视频联动物联网平台】vertx写一个mqtt服务端
45 1
|
2月前
|
存储 安全 物联网
安防摄像头IPC如何快速接入阿里云Link Visual视频服务(阿里云生活物联网)
Link Visual是生活物联网平台针对视频产品推出的增值服务,提供视频数据上云、存储、转发、AI计算等能力。 大白话就是:通过阿里云的Link Visual视频服务,可以让你的IPC摄像头设备完成上云功能,并快速实现如下功能介绍中的功能。其中可以享受阿里云P2P协议支持,帮助企业节省流量服务器流量带宽。
434 7
|
2月前
|
消息中间件 网络协议 物联网
MQTT协议问题之阿里云物联网服务器断开如何解决
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
372 1
|
9月前
|
消息中间件 Java 物联网
[笔记]阿里云物联网之业务服务端(java、php)接入阿里云平台(二)
[笔记]阿里云物联网之业务服务端(java、php)接入阿里云平台(二)
383 0
|
10天前
|
供应链 监控 物联网
未来技术的潮流:区块链、物联网与虚拟现实的融合与创新
【6月更文挑战第20天】在技术不断进步的时代,新兴技术如区块链、物联网(IoT)和虚拟现实(VR)正在逐渐改变我们的生活和工作方式。本文将深入探讨这些技术的发展趋势和应用场景,以及它们如何相互融合,创造出新的商业模式和用户体验。我们将看到,随着这些技术的成熟和应用,未来的世界将变得更加智能、互联和沉浸。
|
2天前
|
传感器 安全 物联网
物联网(IoT)设备的硬件选型与集成技术博文
【6月更文挑战第28天】物联网设备硬件选型与集成聚焦关键要素:功能匹配、性能稳定性、兼容扩展及成本效益。嵌入式系统、通信协议、数据处理和安全性技术确保集成效果,支撑高效、智能的IoT系统,驱动家居、城市与工业自动化变革。
|
3天前
|
供应链 物联网 区块链
新技术趋势与应用:探讨新兴技术如区块链、物联网、虚拟现实等的发展趋势和应用场景
在科技日新月异的今天,新兴技术的发展趋势和应用成为我们关注的焦点。本文将深入探讨区块链技术、物联网以及虚拟现实等新兴技术的发展趋势和应用场景。我们将从数据导向的角度出发,引用权威的研究和统计数据,科学严谨地分析这些技术的发展现状和未来前景。同时,我们也将逻辑严密地探讨这些技术在不同领域的实际应用,以期为读者提供一个全面而深入的理解。

相关产品

  • 物联网平台