抽一根烟的时间学会.NET Core 操作RabbitMQ

简介: 抽一根烟的时间学会.NET Core 操作RabbitMQ

什么是RabbitMQ?

RabbitMQ是由erlang语言开发的一个基于AMQP(Advanced Message Queuing Protocol)协议的企业级消息队列中间件。可实现队列,订阅/发布,路由,通配符等工作模式。

为什么要使用RabbitMQ?

  • 异步处理:比如发送邮件,发送短信等不需要等待处理结果的操作
  • 应用解耦:比如下单成功后,通知仓库发货,不需要等待仓库回应,通过消息队列去通知仓库,降低应用间耦合程序,可并行开发两个功能模块
  • 流量削锋:在抢购或者其他的活动页,服务处于爆发式请求状态,如果直连数据库,数据库容易被拖垮。抢购商品也容易出现库存超卖的情况。通过队列可有效解决该问题。
  • 日志处理:在单机中,日志直接写入到文件目录中,但是在分布式应用中,日志需要有统一的处理机制,可通过消息队列统一由某个消费端做处理。
  • 消息通信:如生产端和消费端可通过队列进行异步通信

如何安装RabbitMQ?

Windows端

  1. 安装erlang语言运行环境
    https://erlang.org/download/otp_win64_23.2.exe
    下载后直接下一步即可
  2. 安装RabbitMQ
    https://www.rabbitmq.com/install-windows.html#installer
    直接点击安装下一步即可按章
  3. 安装RabbitMQ的Web管理平台

RabbitMQ的管理平台是通过插件的形式使用,需要手动启用管理平台

在Windows下,RabbitMQ默认被安装到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。

打开sbin ,在cmd或者powershell中执行

rabbitmq-plugins.bat enable rabbitmq_management

安装完成后,浏览器打开 http://localhost:15672/#/ 即可看到RabbitMQ的管理界面。输入默认账号密码 guest 成功登录。

Linux环境安装

  1. Ubuntu:https://www.rabbitmq.com/install-debian.html
  2. Centos:https://www.rabbitmq.com/install-rpm.html

RabbitMQ的基本概念了解一下?

生产者

发送消息的端

消费者

获取消息并处理的端

Connection

一个终端连接。每一个Connection都可以在RabbitMQ后台看到

Channel

Channel是建立在Connection上的一个虚拟通信管道。一般情况下,往消息队列中写入多条消息,为了不每条消息都建立一个TCP连接,所以RabbitMQ的做法是多条消息可以公用一个Connection,大大提高MQ的负载能力。

Exchange

Exchange是一个虚拟交换机。每一条消息都必须要通过交换机才能能进入对应的队列,可以理解为网络设备中的交换机,是一个意思。

Queue

Queue是一个存储消息的内部对象,所有的Rabbit MQ消息都存储在Queue中。生产者所生产的消息会存储在Queue中,消费者获取的消息也是从Queue中获取。

如何在.NET Core中使用RabbitMQ?

nuget安装

dotnet add package RabbitMQ.Client

创建生产者

const string QUEUENAME = "HELLO_MQ";
            //创建连接对象工厂
            var factory = new ConnectionFactory()
            {
                UserName = "guest",
                Password = "guest",
                HostName = "localhost",
                Port = 5672,  //RabbitMQ默认的端口
            };
            while (true)
            {
                using var conn = factory.CreateConnection();
                var chanel = conn.CreateModel();
                chanel.QueueDeclare(QUEUENAME, true, false, false);
                Console.WriteLine("输入生产内容:");
                var input = Console.ReadLine();
                chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
            }

在循环中,输入一个值,按下enter,即可推送一条消息到队列。

也可以直接在RabbitMQ的管理后台查看

可以看到我们发送的消息已经被RabbitMQ存储在Queue中了。只等某个幸运的消费者前来消费。

创建消费者

const string QUEUENAME = "HELLO_MQ";
            var factory = new ConnectionFactory()
            {
                UserName = "guest",
                Password = "guest",
                HostName = "localhost",
                Port = 5672,
            };
            var conn = factory.CreateConnection();
            var chanel = conn.CreateModel();
            chanel.QueueDeclare(QUEUENAME, true, false, false);
            EventingBasicConsumer consumer = new EventingBasicConsumer(chanel);
            consumer.Received += (a, e) =>
            {
                Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
                chanel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
            };
            chanel.BasicConsume(QUEUENAME, false, consumer);
            Console.WriteLine("启动成功");
            Console.ReadLine();

启动成功后,consumer的Received方法,会收到一条来自MQ的消息,

如果处理完成后,不调用chennel的BasicAck方法,那么这条消息依然会存在,下次有消费者出现,会再次推送给消费者。

简单的RabbitMQ Hello World到这里就算完成了。接下来就是稍微高级一点的应用

RabbitMQ的工作模式

Work Queue 工作队列模式

工作队列模式的意思就是一个生产者对应多个消费者。RabbitMQ会使用轮询去给每个消费者发送消息。

publish/subscribe

发布订阅模式是属于比较用多的一种。

发布订阅,是由交换机发布消息给多个队列。多个队列再对应多个消费者。

发布订阅模式对应的交换机类型的fanout。

消费者

A

const string QUEUENAME = "HELLO_MQ_B";
            const string TESTEXCHANGE = "TESTEXCHANGE";
            var factory = new ConnectionFactory()
            {
                UserName = "guest",
                Password = "guest",
                HostName = "localhost",
                Port = 5672,
            };
            var conn = factory.CreateConnection();
            var channel = conn.CreateModel();
            //定义队列
            channel.QueueDeclare(QUEUENAME, true, false, false);
            //定义交换机
            channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
            //绑定队列到交换机
            channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (a, e) =>
            {
                Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
                channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
            };
            channel.BasicConsume(QUEUENAME, false, consumer);
            Console.WriteLine("启动成功");
            Console.ReadLine();

B

const string QUEUENAME = "HELLO_MQ";
            const string TESTEXCHANGE = "TESTEXCHANGE";
            var factory = new ConnectionFactory()
            {
                UserName = "guest",
                Password = "guest",
                HostName = "localhost",
                Port = 5672,
            };
            var conn = factory.CreateConnection();
            var channel = conn.CreateModel();
            //定义队列
            channel.QueueDeclare(QUEUENAME, true, false, false);
            //定义交换机
            channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
            //绑定队列到交换机
            channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (a, e) =>
            {
                Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
                channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
            };
            channel.BasicConsume(QUEUENAME, false, consumer);
            Console.WriteLine("启动成功");
            Console.ReadLine();

生产者

const string QUEUENAME = "HELLO_MQ";
            const string QUEUENAME_B = "HELLO_MQ_B";
            const string TESTEXCHANGE = "TESTEXCHANGE";
            //创建连接对象工厂
            var factory = new ConnectionFactory()
            {
                UserName = "guest",
                Password = "guest",
                HostName = "localhost",
                Port = 5672,  //RabbitMQ默认的端口
            };
            using var conn = factory.CreateConnection();
            while (true)
            {
                var channel = conn.CreateModel();
                //定义交换机
                channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
                Console.WriteLine("输入生产内容:");
                var input = Console.ReadLine();
                channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
            }

在生产者运行成功后,RabbitMQ后台会出现一个交换机,点击交换机会看到交换机下绑定了两个队列

从生产者发送消息到队列,两个消费者会同时收到消息

routing模式

routing模式对应的交换机类型是direct,和发布订阅模式的区别在于:routing模式下,可以指定一个routingkey,用于区分消息

生产者

var channel = conn.CreateModel();
                //定义交换机
                channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
                //绑定队列到交换机
                Console.WriteLine("输入生产内容:");
                var input = Console.ReadLine();
                channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));

消费者 A

//定义队列
            channel.QueueDeclare(QUEUENAME, true, false, false);
            //定义交换机
            channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
            //绑定队列到交换机
            channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");

消费者 B

//定义队列
            channel.QueueDeclare(QUEUENAME, true, false, false);
            //定义交换机
            channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
            //绑定队列到交换机
            channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");

绑定成功后,发送消息,消费者A可以收到消息,消费者B无法收到消息。

如果遇到指定routingKey生产一条消息,结果 AB消费者都收到的情况。建议在RabbitMQ后台的交换机下看一下绑定的Queue是否重复绑定了多个routingKey.

topic通配符模式

在通配符模式下,RabbitMQ使用模糊匹配来决定把消息推送给哪个生产者。通配符有两个符号来匹配routingKey

  1. *匹配一个字符 如:*.qq.com 可匹配 1.qq.com
  2. #匹配一个或者多个字符。 如:*.qq.com 可匹配 1.qq.com或者1111.qq.com

其他的操作基本和routing模式一样。

header模式

header模式是把routingkey放到header中.取消掉了routingKey。并使用一个字典传递 K、V的方式来匹配。

比如同时要给用户发送邮件和短信,可直接通过header的键值对来匹配绑定的值,把消息传递给发短信和邮件的生产者.

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4天前
|
开发框架 .NET 开发者
简化 ASP.NET Core 依赖注入(DI)注册-Scrutor
Scrutor 是一个简化 ASP.NET Core 应用程序中依赖注入(DI)注册过程的开源库,支持自动扫描和注册服务。通过简单的配置,开发者可以轻松地从指定程序集中筛选、注册服务,并设置其生命周期,同时支持服务装饰等高级功能。适用于大型项目,提高代码的可维护性和简洁性。仓库地址:<https://github.com/khellang/Scrutor>
22 5
|
2月前
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
|
6天前
|
消息中间件 存储 JSON
Net使用EasyNetQ简化与RabbitMQ的交互
EasyNetQ是专为.NET环境设计的RabbitMQ客户端API,简化了与RabbitMQ的交互过程。通过NuGet安装EasyNetQ,可轻松实现消息的发布与订阅,支持多种消息模式及高级特性。文中提供了详细的安装步骤、代码示例及基础知识介绍,帮助开发者快速上手。关注公众号“Net分享”获取更多技术文章。
17 1
Net使用EasyNetQ简化与RabbitMQ的交互
|
23天前
|
开发框架 .NET C#
在 ASP.NET Core 中创建 gRPC 客户端和服务器
本文介绍了如何使用 gRPC 框架搭建一个简单的“Hello World”示例。首先创建了一个名为 GrpcDemo 的解决方案,其中包含一个 gRPC 服务端项目 GrpcServer 和一个客户端项目 GrpcClient。服务端通过定义 `greeter.proto` 文件中的服务和消息类型,实现了一个简单的问候服务 `GreeterService`。客户端则通过 gRPC 客户端库连接到服务端并调用其 `SayHello` 方法,展示了 gRPC 在 C# 中的基本使用方法。
35 5
在 ASP.NET Core 中创建 gRPC 客户端和服务器
|
12天前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
22 3
|
3月前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
3月前
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
109 3
|
2月前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
|
3月前
|
开发框架 NoSQL .NET
利用分布式锁在ASP.NET Core中实现防抖
【9月更文挑战第5天】在 ASP.NET Core 中,可通过分布式锁实现防抖功能,仅处理连续相同请求中的首个请求,其余请求返回 204 No Content,直至锁释放。具体步骤包括:安装分布式锁库如 `StackExchange.Redis`;创建分布式锁服务接口及其实现;构建防抖中间件;并在 `Startup.cs` 中注册相关服务和中间件。这一机制有效避免了短时间内重复操作的问题。
|
4月前
|
开发框架 监控 .NET
开发者的革新利器:ASP.NET Core实战指南,构建未来Web应用的高效之道
【8月更文挑战第28天】本文探讨了如何利用ASP.NET Core构建高效、可扩展的Web应用。ASP.NET Core是一个开源、跨平台的框架,具有依赖注入、配置管理等特性。文章详细介绍了项目结构规划、依赖注入配置、中间件使用及性能优化方法,并讨论了安全性、可扩展性以及容器化的重要性。通过这些技术要点,开发者能够快速构建出符合现代Web应用需求的应用程序。
64 0