.net平台的rabbitmq使用封装(一)

简介: .net平台的rabbitmq使用封装(一)

前言


RabbitMq大家再熟悉不过,这篇文章主要整对rabbitmq学习后封装RabbitMQ.Client的一个分享。文章最后,我会把封装组件和demo奉上。


Rabbitmq的关键术语


  1、绑定器(Binding):根据路由规则绑定Queue和Exchange。

  2、路由键(Routing Key):Exchange根据关键字进行消息投递。

  3、交换机(Exchange):指定消息按照路由规则进入指定队列

  4、消息队列(Queue):消息的存储载体

  5、生产者(Producer):消息发布者。

  6、消费者(Consumer):消息接收者。


Rabbitmq的运作


从下图可以看出,发布者(Publisher)是把消息先发送到交换器(Exchange),再从交换器发送到指定队列(Queue),而先前已经声明交换器与队列绑定关系,最后消费者(Customer)通过订阅或者主动取指定队列消息进行消费。


image.png


那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。


推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)


拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)


使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。


可是程序偶尔会出异常,例如网络或者DB超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(IsPostSuccess == True),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。


这个时候不需要及时的去处理消息,有个JOB定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。


Publish(发布)的封装


步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将Model存到了ConcurrentDictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。


1         /// <summary>
 2         /// 交换器声明
 3         /// </summary>
 4         /// <param name="iModel"></param>
 5         /// <param name="exchange">交换器</param>
 6         /// <param name="type">交换器类型:
 7         /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
 8         /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
 9         /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
10         /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
11         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
12         /// 交换机转发消息是最快的。
13         /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
14         /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
15         /// 只会匹配到“audit.irs”。</param>
16         /// <param name="durable">持久化</param>
17         /// <param name="autoDelete">自动删除</param>
18         /// <param name="arguments">参数</param>
19         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,
20             bool durable = true,
21             bool autoDelete = false, IDictionary<string, object> arguments = null)
22         {
23             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();
24             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
25         }
26
27         /// <summary>
28         /// 队列声明
29         /// </summary>
30         /// <param name="channel"></param>
31         /// <param name="queue">队列</param>
32         /// <param name="durable">持久化</param>
33         /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
34         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
35         /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
36         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
37         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
38         /// <param name="autoDelete">自动删除</param>
39         /// <param name="arguments">参数</param>
40         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
41             bool autoDelete = false, IDictionary<string, object> arguments = null)
42         {
43             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();
44             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
45         }
46
47         /// <summary>
48         /// 获取Model
49         /// </summary>
50         /// <param name="exchange">交换机名称</param>
51         /// <param name="queue">队列名称</param>
52         /// <param name="routingKey"></param>
53         /// <param name="isProperties">是否持久化</param>
54         /// <returns></returns>
55         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
56         {
57             return ModelDic.GetOrAdd(queue, key =>
58             {
59                 var model = _conn.CreateModel();
60                 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
61                 QueueDeclare(model, queue, isProperties);
62                 model.QueueBind(queue, exchange, routingKey);
63                 ModelDic[queue] = model;
64                 return model;
65             });
66         }
67
68         /// <summary>
69         /// 发布消息
70         /// </summary>
71         /// <param name="routingKey">路由键</param>
72         /// <param name="body">队列信息</param>
73         /// <param name="exchange">交换机名称</param>
74         /// <param name="queue">队列名</param>
75         /// <param name="isProperties">是否持久化</param>
76         /// <returns></returns>
77         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
78         {
79             var channel = GetModel(exchange, queue, routingKey, isProperties);
80
81             try
82             {
83                 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());
84             }
85             catch (Exception ex)
86             {
87                 throw ex.GetInnestException();
88             }
89         }


下次是本机测试的发布速度截图:

image.png


4.2W/S属于稳定速度,把反序列化(ToJson)会稍微快一些。

 

Subscribe(订阅)的封装


发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的“死信队列”里,由另外的JOB进行定时重发,因此,finally是应答成功的。


/// <summary>
        /// 获取Model
        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {
            return ModelDic.GetOrAdd(queue, value =>
             {
                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);
                 //每次消费的消息数
                 model.BasicQos(0, 1, false);
                 ModelDic[queue] = model;
                 return model;
             });
        }    
        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消费处理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {
            //队列声明
            var channel = GetModel(queue, isProperties);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();
                try
                {
                    handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }
                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }


下次是本机测试的发布速度截图:


image.png


快的时候有1.9K/S,慢的时候也有1.7K/S

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7天前
|
存储 NoSQL MongoDB
.NET MongoDB数据仓储和工作单元模式封装
.NET MongoDB数据仓储和工作单元模式封装
41 15
|
5天前
|
Linux API C#
基于 .NET 开发的多功能流媒体管理控制平台
基于 .NET 开发的多功能流媒体管理控制平台
|
30天前
|
消息中间件 运维 安全
C5GAME 游戏饰品交易平台借助 RocketMQ Serverless 保障千万级玩家流畅体验
游戏行业蓬勃发展,作为国内领先的 STEAM 游戏饰品交易的服务平台,看 C5GAME 如何利用 RocketMQ Serverless 技术,为千万级玩家提供流畅的游戏体验,同时降低成本并提升运维效率。
107 10
C5GAME 游戏饰品交易平台借助 RocketMQ Serverless 保障千万级玩家流畅体验
|
24天前
|
消息中间件 存储 JSON
Net使用EasyNetQ简化与RabbitMQ的交互
EasyNetQ是专为.NET环境设计的RabbitMQ客户端API,简化了与RabbitMQ的交互过程。通过NuGet安装EasyNetQ,可轻松实现消息的发布与订阅,支持多种消息模式及高级特性。文中提供了详细的安装步骤、代码示例及基础知识介绍,帮助开发者快速上手。关注公众号“Net分享”获取更多技术文章。
34 1
Net使用EasyNetQ简化与RabbitMQ的交互
|
10天前
|
JSON 数据格式
.net HTTP请求类封装
`HttpRequestHelper` 是一个用于简化 HTTP 请求的辅助类,支持发送 GET 和 POST 请求。它使用 `HttpClient` 发起请求,并通过 `Newtonsoft.Json` 处理 JSON 数据。示例展示了如何使用该类发送请求并处理响应。注意事项包括:简单的错误处理、需安装 `Newtonsoft.Json` 依赖,以及建议重用 `HttpClient` 实例以优化性能。
54 2
|
2月前
|
机器学习/深度学习 人工智能 Cloud Native
在数字化时代,.NET 技术凭借其跨平台兼容性、丰富的类库和工具集以及卓越的性能与效率,成为软件开发的重要平台
在数字化时代,.NET 技术凭借其跨平台兼容性、丰富的类库和工具集以及卓越的性能与效率,成为软件开发的重要平台。本文深入解析 .NET 的核心优势,探讨其在企业级应用、Web 开发及移动应用等领域的应用案例,并展望未来在人工智能、云原生等方面的发展趋势。
45 3
|
6月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
存储 设计模式 编解码
.NET 8.0 通用管理平台,支持模块化、WinForms 和 WPF
【11月更文挑战第5天】本文分析了.NET 8.0 通用管理平台在模块化、WinForms 和 WPF 方面的优势。模块化设计提升了系统的可维护性和可扩展性,提高了代码复用性;WinForms 提供了丰富的控件库和简单易用的开发模式,技术成熟稳定;WPF 支持强大的数据绑定和 MVVM 模式,具备丰富的图形和动画功能,以及灵活的布局系统。
EMQ
|
6月前
|
物联网 Linux C语言
在 Windows 平台搭建 MQTT 服务
NanoMQ 有着强大的跨平台和可兼容能力,不仅可以用于以 Linux 为基础的各类平台,也为 Windows 平台提供了 MQTT 服务的新选择。
EMQ
125 9
在 Windows 平台搭建 MQTT 服务
|
5月前
|
缓存 程序员
封装一个给 .NET Framework 用的内存缓存帮助类
封装一个给 .NET Framework 用的内存缓存帮助类