Step By Step
1、相关参数获取:
2、SDK:AMQPNetLite 安装:
3、Code Sample
using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;
namespace IOTAmqpDemo
{
class Program
{
//接入域名,请参见AMQP客户端接入说明文档。
// 注意不用带 amqp://
static string Host = "18482178********.iot-amqp.cn-shanghai.aliyuncs.com";
static int Port = 5671;
static string AccessKey = "LTAIOZZg********";
static string AccessSecret = "v7CjUJCMk7j9aK****************";
static string consumerGroupId = "R45CIUGUE3****************";
static string clientId = "demoClientID";
//iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
static string iotInstanceId = "";
static int Count = 0;
static int IntervalTime = 10000;
static Address address;
static void Main(string[] args)
{
long timestamp = GetCurrentMilliseconds();
string param = "authId=" + AccessKey + "×tamp=" + timestamp;
//userName组装方法,请参见AMQP客户端接入说明文档。
string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
+ ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
string password = doSign(param, AccessSecret, "HmacMD5");
DoConnectAmqp(userName, password);
ManualResetEvent resetEvent = new ManualResetEvent(false);
resetEvent.WaitOne();
}
static void DoConnectAmqp(string userName, string password)
{
address = new Address(Host, Port, userName, password);
//Create Connection
ConnectionFactory cf = new ConnectionFactory();
//use local tls if neccessary
//cf.SSL.ClientCertificates.Add(GetCert());
//cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
cf.SASL.Profile = SaslProfile.External;
cf.AMQP.IdleTimeout = 120000;
//cf.AMQP.
//cf.AMQP.ContainerId、cf.AMQP.HostName请自定义。
cf.AMQP.ContainerId = "client.1.2";
cf.AMQP.HostName = "contoso.com";
cf.AMQP.MaxFrameSize = 8 * 1024;
var connection = cf.CreateAsync(address).Result;
//Connection Exception Closed
connection.AddClosedCallback(ConnClosed);
//Receive Message
DoReceive(connection);
}
static void DoReceive(Connection connection)
{
//Create Session
var session = new Session(connection);
//Create Link and Receive Message
var receiver = new ReceiverLink(session, "queueName", null);
receiver.Start(20, (link, message) =>
{
object messageId = message.ApplicationProperties["messageId"];
object topic = message.ApplicationProperties["topic"];
string body = Encoding.UTF8.GetString((Byte[])message.Body);
//注意:此处不要有耗时的逻辑,如果这里要进行业务处理,请另开线程,否则会堵塞消费。如果消费一直延时,会增加消息重发的概率。
Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);
//Acknowledge Message
link.Accept(message);
});
}
//连接发生异常后,进入重连模式。
//这里只是一个简单重试的示例,您可以采用指数退避方式,来完善异常场景,重连策略。
static void ConnClosed(IAmqpObject _, Error e)
{
Console.WriteLine("ocurr error: " + e);
if (Count < 3)
{
Count += 1;
Thread.Sleep(IntervalTime * Count);
}
else
{
Thread.Sleep(120000);
}
//Reconnection
DoConnectAmqp(address.User, address.Password);
}
static X509Certificate GetCert()
{
string certPath = Environment.CurrentDirectory + "/root.crt";
X509Certificate crt = new X509Certificate(certPath);
return crt;
}
static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
static long GetCurrentMilliseconds()
{
DateTime dt1970 = new DateTime(1970, 1, 1);
DateTime current = DateTime.Now;
return (long)(current - dt1970).TotalMilliseconds;
}
//签名方法:支持hmacmd5,hmacsha1和hmacsha256。
static string doSign(string param, string accessSecret, string signMethod)
{
//signMethod = HmacMD5
byte[] key = Encoding.UTF8.GetBytes(accessSecret);
byte[] signContent = Encoding.UTF8.GetBytes(param);
var hmac = new HMACMD5(key);
byte[] hashBytes = hmac.ComputeHash(signContent);
return Convert.ToBase64String(hashBytes);
}
}
}
4、测试效果
- 4.1 设备上报消息
- 4.2 服务端订阅