阿里云物联网平台AMQP服务端订阅NetSDK Demo

简介: 服务端可以直接订阅产品下所有类型的消息:设备上报消息、设备状态变化通知、网关发现子设备上报、设备生命周期变更、设备拓扑关系变更。配置服务端订阅后,物联网平台会将产品下所有设备的已订阅类型的消息转发至您的服务端。本文主要演示如果使用NET SDK进行AMQP服务端订阅。

Step By Step

1、相关参数获取:

参考链接:阿里云物联网平台AMQP服务端订阅NodeJS Demo

2、SDK:AMQPNetLite 安装:
图片.png

3、Code Sample

using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;

namespace IOTAmqpDemo
{
    class Program
    {
        //接入域名,请参见AMQP客户端接入说明文档。
       // 注意不用带 amqp://
        static string Host = "18482178********.iot-amqp.cn-shanghai.aliyuncs.com";
        static int Port = 5671;
        static string AccessKey = "LTAIOZZg********";
        static string AccessSecret = "v7CjUJCMk7j9aK****************";
        static string consumerGroupId = "R45CIUGUE3****************";
        static string clientId = "demoClientID";
        //iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
        static string iotInstanceId = "";
        static int Count = 0;
        static int IntervalTime = 10000;

        static Address address;
        static void Main(string[] args)
        {
            long timestamp = GetCurrentMilliseconds();
            string param = "authId=" + AccessKey + "&timestamp=" + timestamp;
            //userName组装方法,请参见AMQP客户端接入说明文档。
            string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
               + ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
            string password = doSign(param, AccessSecret, "HmacMD5");

            DoConnectAmqp(userName, password);

            ManualResetEvent resetEvent = new ManualResetEvent(false);
            resetEvent.WaitOne();
        }

        static void DoConnectAmqp(string userName, string password)
        {
            address = new Address(Host, Port, userName, password);
            //Create Connection
            ConnectionFactory cf = new ConnectionFactory();
            //use local tls if neccessary
            //cf.SSL.ClientCertificates.Add(GetCert());
            //cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
            cf.SASL.Profile = SaslProfile.External;
            cf.AMQP.IdleTimeout = 120000;
            //cf.AMQP.
            //cf.AMQP.ContainerId、cf.AMQP.HostName请自定义。
            cf.AMQP.ContainerId = "client.1.2";
            cf.AMQP.HostName = "contoso.com";

            cf.AMQP.MaxFrameSize = 8 * 1024;
            var connection = cf.CreateAsync(address).Result;

            //Connection Exception Closed
            connection.AddClosedCallback(ConnClosed);

            //Receive Message
            DoReceive(connection);
        }

        static void DoReceive(Connection connection)
        {
            //Create Session
            var session = new Session(connection);

            //Create Link and Receive Message
            var receiver = new ReceiverLink(session, "queueName", null);


            receiver.Start(20, (link, message) =>
            {
                object messageId = message.ApplicationProperties["messageId"];
                object topic = message.ApplicationProperties["topic"];
                string body = Encoding.UTF8.GetString((Byte[])message.Body);
                //注意:此处不要有耗时的逻辑,如果这里要进行业务处理,请另开线程,否则会堵塞消费。如果消费一直延时,会增加消息重发的概率。
                Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);

                //Acknowledge Message
                link.Accept(message);
            });
        }

        //连接发生异常后,进入重连模式。
        //这里只是一个简单重试的示例,您可以采用指数退避方式,来完善异常场景,重连策略。
        static void ConnClosed(IAmqpObject _, Error e)
        {
            Console.WriteLine("ocurr error: " + e);
            if (Count < 3)
            {
                Count += 1;
                Thread.Sleep(IntervalTime * Count);
            }
            else
            {
                Thread.Sleep(120000);
            }

            //Reconnection
            DoConnectAmqp(address.User, address.Password);
        }

        static X509Certificate GetCert()
        {
            string certPath = Environment.CurrentDirectory + "/root.crt";
            X509Certificate crt = new X509Certificate(certPath);

            return crt;
        }

        static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            return true;
        }

        static long GetCurrentMilliseconds()
        {
            DateTime dt1970 = new DateTime(1970, 1, 1);
            DateTime current = DateTime.Now;
            return (long)(current - dt1970).TotalMilliseconds;
        }

        //签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        static string doSign(string param, string accessSecret, string signMethod)
        {
            //signMethod = HmacMD5
            byte[] key = Encoding.UTF8.GetBytes(accessSecret);
            byte[] signContent = Encoding.UTF8.GetBytes(param);
            var hmac = new HMACMD5(key);
            byte[] hashBytes = hmac.ComputeHash(signContent);
            return Convert.ToBase64String(hashBytes);
        }
    }
}

4、测试效果

  • 4.1 设备上报消息
    图片.png
  • 4.2 服务端订阅
    图片.png

更多参考

NET SDK接入示例
阿里云物联网平台AMQP服务端订阅NodeJS Demo

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
5月前
|
消息中间件 DataWorks 物联网
MQTT问题之接入阿里云物联网平台如何解决
MQTT接入是指将设备或应用通过MQTT协议接入到消息服务器,以实现数据的发布和订阅;本合集着眼于MQTT接入的流程、配置指导以及常见接入问题的解决方法,帮助用户实现稳定可靠的消息交换。
372 1
|
2月前
|
消息中间件 传感器 监控
AMQP 与物联网 (IoT) 应用的结合
【8月更文第28天】高级消息队列协议 (AMQP) 是一种开放标准的应用层协议,特别适合于物联网 (IoT) 场景中的消息传递。AMQP 提供了可靠的、可扩展的消息传输机制,能够处理来自大量设备的数据流。本文将探讨 AMQP 在 IoT 应用中的优势,并提供使用不同编程语言构建 AMQP 客户端的具体示例。
24 0
|
3月前
|
存储 运维 监控
阿里云物联网平台的优势
【7月更文挑战第19天】阿里云物联网平台的优势
59 1
|
3月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
Cloud Native 安全 物联网
【阿里云云原生专栏】云边端一体化:阿里云如何利用云原生技术赋能物联网
【5月更文挑战第22天】阿里云借助云原生技术赋能物联网,实现云边端一体化,提升系统弹性与敏捷性。通过容器化部署,保证高可用性与可靠性。在智能交通等领域,阿里云提供高效解决方案,实现实时数据分析与决策。代码示例展示如何使用阿里云服务处理物联网数据。同时,阿里云重视数据安全,采用加密和访问控制保障数据隐私。丰富的工具和服务支持开发者构建物联网应用,推动技术广泛应用与发展。
308 1
|
5月前
|
消息中间件 存储 JavaScript
阿里云IOC物联网异步处理基础概念
该内容介绍了异步处理基础和消息队列的相关概念。首先,同步处理指任务完成后才能执行其他操作,而异步则允许任务并行执行,不阻塞程序。异步能提高系统并发性和响应性,但也增加复杂性和资源消耗。接着,提到了消息队列,包括生产者、消费者、队列、broker和topic等概念,并指出在IoT中,设备作为生产者发送消息到特定topic,消费者从队列获取数据。最后,简要介绍了AMQP协议,它是用于应用程序间消息传递的开放标准,常用于分布式系统和物联网,如RabbitMQ和Apache Qpid。课程将以Apache Qpid为例接收IoT数据。
204 6
阿里云IOC物联网异步处理基础概念
|
5月前
【开源视频联动物联网平台】vertx写一个mqtt服务端
【开源视频联动物联网平台】vertx写一个mqtt服务端
120 1
|
5月前
|
消息中间件 网络协议 物联网
MQTT协议问题之阿里云物联网服务器断开如何解决
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
462 1
|
3天前
|
供应链 安全 物联网
未来已来:区块链技术在物联网与虚拟现实中的融合创新
【9月更文挑战第30天】随着科技的飞速进步,区块链、物联网(IoT)和虚拟现实(VR)技术不断突破旧有边界,相互交织形成新的技术生态。本文将深入探讨这些技术的发展趋势,并分析它们如何在实际应用中相互促进,共同塑造我们的未来。我们将看到,通过智能合约和去中心化的特性,区块链为物联网设备提供了安全的数据交换平台;同时,物联网的广泛部署又为区块链技术带来了丰富的应用场景。而在虚拟现实领域,区块链不仅能够确保数字资产的安全交易,还能增强用户的沉浸式体验。这些技术的融合预示着一个更加互联、高效和可信的未来。
19 8
|
1天前
|
传感器 安全 物联网
新技术趋势与应用随着科技的不断进步,新兴技术如区块链、物联网和虚拟现实等正迅速改变我们的世界。这些技术不仅在各自领域内展现出强大的潜力,还在相互融合中催生出更多创新应用场景。本文将探讨这些新兴技术的发展趋势及其在各行业中的应用前景,通过通俗易懂的语言和清晰的条理,带领读者了解其内涵和意义。
本文旨在探讨区块链技术、物联网和虚拟现实等新兴技术的发展趋势及其在各个行业的应用场景。通过分析这些技术的独特优势和潜在缺陷,揭示它们对未来社会和经济可能带来的深远影响。同时,结合实际案例,展示这些技术如何解决现实问题,为各行各业提供新的发展机遇。
11 4

相关产品

  • 物联网平台
  • 下一篇
    无影云桌面