十五、.net core(.NET 6)搭建RabbitMQ消息队列生产者和消费者的简单方法

简介: 搭建RabbitMQ简单通用的直连方法 如果还没有MQ环境,可以参考上一篇的博客:https://www.cnblogs.com/weskynet/p/14877932.html

搭建RabbitMQ简单通用的直连方法

 

如果还没有MQ环境,可以参考上一篇的博客

https://www.cnblogs.com/weskynet/p/14877932.html

 

接下来开始.net core操作Rabbitmq有关的内容。我打算使用比较简单的单机的direct直连模式,来演示一下有关操作,基本套路差不多。

首先,我在我的package包项目上面,添加对RabbitMQ.Client的引用:

1995789-20210612150748600-763333678.png


Common文件夹下,新建类库项目 Wsk.Core.RabbitMQ,并且引用package项目:

1995789-20210612150800140-1506975961.png

 

在启动项目下的appsettings配置文件里面,新增一个访问RabbitMQ的配置信息:

1995789-20210612150815024-1043581205.png

 

配置部分代码:


"MQ": [
    {
      "Host": "127.0.0.1", // MQ安装的实际服务器IP地址
      "Port": 5672, // 服务端口号
      "User": "wesky", // 用户名
      "Password": "wesky123", // 密码
      "ExchangeName": "WeskyExchange", // 设定一个Exchange名称,
      "Durable": true // 是否启用持久化
    }
  ]


然后,在实体类项目下,新建实体类MqConfigInfo,用于把读取的配置信息赋值到该实体类下:


1995789-20210612150922207-392651674.png

实体类代码:


public class MqConfigInfo
    {
        public string Host { get; set; }
        public int Port { get; set; }
        public string User { get; set; }
        public string Password { get; set; }
        public string ExchangeName { get; set; }
        public bool Durable { get; set; }
    }

 

在刚刚新建的RabbitMQ类库项目下面,引用该实体类库项目,以及APppSettings项目。然后新建一个类,叫做ReadMqConfigHelper,以及它的interface接口,并且提供一个方法,叫ReadMqConfig,用来进行读取配置信息使用:


1995789-20210612151019787-1050807286.png


读取配置信息类代码:


public class ReadMqConfigHelper:IReadMqConfigHelper
    {
        private readonly ILogger<ReadMqConfigHelper> _logger;
        public ReadMqConfigHelper(ILogger<ReadMqConfigHelper>  logger)
        {
            _logger = logger;
        }
        public List<MqConfigInfo> ReadMqConfig()
        {
            try
            {
                List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 读取MQ配置信息
                if (config.Any())
                {
                    return config;
                }
                _logger.LogError($"获取MQ配置信息失败:没有可用数据集");
                return null;
            }
            catch (Exception ex)
            {
                _logger.LogError($"获取MQ配置信息失败:{ex.Message}");
                return null;
            }
        }
    }

接着,新建类MqConnectionHelper以及接口IMqConnectionHelper,用于做MQ连接、创建生产者和消费者等有关操作:

1995789-20210612151118690-806829290.png

 

然后,新增一系列创建连接所需要的静态变量:

1995789-20210612151135445-1802991494.png

 

然后,设置两个消费者队列,用来测试。以及添加生产者连接有关的配置和操作:

1995789-20210612151149641-1250848334.png

 

然后,创建消费者连接方法:

1995789-20210612151223734-1684810079.png

1995789-20210612151236324-996061919.png


其中,StartListener下面提供了事件,用于手动确认消息接收。如果设置为自动,有可能导致消息丢失:

1995789-20210612151305302-1172146994.png


然后,添加消息发布方法:

1995789-20210612151316513-1521337055.png


interface接口里面,添加有关的接口,用于等下依赖注入使用:

1995789-20210612151332595-1766248494.png


连接类部分的代码:


 public class MqConnectionHelper:IMqConnectionHelper
    {
        private readonly ILogger<MqConnectionHelper> _logger;
        public MqConnectionHelper(ILogger<MqConnectionHelper> logger)
        {
            _logger = logger;
            _connectionReceiveFactory = new IConnectionFactory[_costomerCount];
            _connectionReceive = new IConnection[_costomerCount];
            _modelReceive = new IModel[_costomerCount];
            _basicConsumer = new EventingBasicConsumer[_costomerCount];
        }
        /*
         备注:使用数组的部分,是给消费端用的。目前生产者只设置了一个,消费者可能存在多个。
                     当然,有条件的还可以上RabbitMQ集群进行处理,会更好玩一点。
         */
        private static IConnectionFactory _connectionSendFactory;  //RabbitMQ工厂 发送端
        private static IConnectionFactory[] _connectionReceiveFactory; //RabbitMQ工厂 接收端  
        private static IConnection _connectionSend; //连接 发送端
        private static IConnection[] _connectionReceive; //连接 消费端
        public static List<MqConfigInfo> _mqConfig; // 配置信息
        private static IModel _modelSend;  //通道  发送端
        private static IModel[] _modelReceive; //通道  消费端
        private static EventingBasicConsumer[] _basicConsumer;  // 事件
        /* 设置两个routingKey 和 队列名称,用来做测试使用*/
        public static int _costomerCount = 2;
        public static string[] _routingKey = new string[] {"WeskyNet001","WeskyNet002" };
        public static string[] _queueName = new string[] { "Queue001", "Queue002" };
        /// <summary>
        /// 生产者初始化连接配置
        /// </summary>
        public void SendFactoryConnectionInit()
        {
            _connectionSendFactory = new ConnectionFactory
            {
                HostName = _mqConfig.FirstOrDefault().Host,
                Port = _mqConfig.FirstOrDefault().Port,
                UserName = _mqConfig.FirstOrDefault().User,
                Password = _mqConfig.FirstOrDefault().Password
            };
        }
        /// <summary>
        /// 生产者连接
        /// </summary>
        public void SendFactoryConnection()
        {
            if (null != _connectionSend && _connectionSend.IsOpen)
            {
                return; // 已有连接
            }
            _connectionSend = _connectionSendFactory.CreateConnection(); // 创建生产者连接
            if (null != _modelSend && _modelSend.IsOpen)
            {
                return; // 已有通道
            }
            _modelSend = _connectionSend.CreateModel(); // 创建生产者通道
            _modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); // 定义交换机名称和类型(direct)
        }
        /// <summary>
        /// 消费者初始化连接配置
        /// </summary>
        public void ReceiveFactoryConnectionInit()
        {
            var factories = new ConnectionFactory
            {
                HostName = _mqConfig.FirstOrDefault().Host,
                Port = _mqConfig.FirstOrDefault().Port,
                UserName = _mqConfig.FirstOrDefault().User,
                Password = _mqConfig.FirstOrDefault().Password
            };
            for (int i = 0; i < _costomerCount; i++)
            {
                _connectionReceiveFactory[i] = factories;  // 给每个消费者绑定一个连接工厂
            }
        }
        /// <summary>
        /// 消费者连接
        /// </summary>
        /// <param name="consumeIndex"></param>
        /// <param name="exchangeName"></param>
        /// <param name="routeKey"></param>
        /// <param name="queueName"></param>
        public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName)
        {
            _logger.LogInformation($"开始连接RabbitMQ消费者:{routeKey}");
            if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen)
            {
                return;
            }
            _connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); // 创建消费者连接
            if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen)
            {
                return;
            }
            _modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel();  // 创建消费者通道
            _basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]);
            _modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); // 定义交换机名称和类型  与生产者保持一致
            _modelReceive[consumeIndex].QueueDeclare(
                         queue: queueName, //消息队列名称
                         durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此处配置在文件中,默认全局持久化(true),也可以自定义更改
                         exclusive: false,
                         autoDelete: false,
                         arguments: null
           );  // 定义消费者队列
            _modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); // 队列绑定给指定的交换机
            _modelReceive[consumeIndex].BasicQos(0, 1, false); // 设置消费者每次只接收一条消息
            StartListener((model, ea) =>
            {
                byte[] message = ea.Body.ToArray(); // 接收到的消息
                string msg = Encoding.UTF8.GetString(message);
                _logger.LogInformation($"队列{queueName}接收到消息:{msg}");
                Thread.Sleep(2000);
                _modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);
            }, queueName, consumeIndex);
        }
        /// <summary>
        /// 消费者接收消息的确认机制
        /// </summary>
        /// <param name="basicDeliverEventArgs"></param>
        /// <param name="queueName"></param>
        /// <param name="consumeIndex"></param>
        private static void StartListener(EventHandler<BasicDeliverEventArgs> basicDeliverEventArgs, string queueName, int consumeIndex)
        {
            _basicConsumer[consumeIndex].Received += basicDeliverEventArgs;
            _modelReceive[consumeIndex].BasicConsume(queue: queueName, autoAck: false, consumer: _basicConsumer[consumeIndex]); // 设置手动确认。
        }
        /// <summary>
        /// 消息发布
        /// </summary>
        /// <param name="message"></param>
        /// <param name="exchangeName"></param>
        /// <param name="routingKey"></param>
        public static void PublishExchange(string message, string exchangeName, string routingKey = "")
        {
            byte[] body = Encoding.UTF8.GetBytes(message);
            _modelSend.BasicPublish(exchangeName, routingKey, null, body);
        }
    }

 

现在,我把整个Wsk.Core.RabbitMQ项目进行添加到依赖注入:

1995789-20210612151459466-191138761.png


然后,在启动项目里面的初始化服务里面,添加对MQ连接的初始化以及连接,并且发送两条消息进行测试:

1995789-20210612151516868-465927927.png

 

启用程序,提示发送成功:

1995789-20210612151530803-7572458.png

 

打开RabbitMQ页面客户端,可以看见新增了一个交换机WeskyExchange

1995789-20210612151544268-822619651.png


点进去可以看见对应的流量走势:

1995789-20210612151559916-1375147519.png

 

关闭程序,现在添加消费者的初始化和连接,然后重新发送:

1995789-20210612151613195-141887633.png

 

可见发送消息成功,并且消费者也成功接收到了消息。打开客户端查看一下:

1995789-20210612151632172-1474116099.png

 

WeskyExchange交换机下,多了两个队列,以及队列归属的RoutingKey分别是WeskyNet001WeskyNet002


1995789-20210612151649442-369674273.png


以及在Queue目录下,多了两个队列的监控信息:

为了看出点效果,我们批量发消息试一下:

1995789-20210612151708056-587452585.png


然后启动项目,我们看一下监控效果。先是交换机页面的监控:

1995789-20210612151725778-1891739168.png

 

然后是队列1的监控:

1995789-20210612151736931-446857784.png

 

现在换一种写法,在消费者那边加个延迟:

1995789-20210612151756235-462650035.png

 

并且生产者的延迟解除:

1995789-20210612151806018-136009502.png

 

再启动一下看看效果:

1995789-20210612151814778-1302810135.png

 

会发现队列消息被堵塞,必须在执行完成以后,才可以解锁。而且生产者这边并不需要等待,可以看见消息一次性全发出去了,可以继续执行后续操作:

1995789-20210612151824090-1967075649.png


 

以上就是关于使用Direct模式进行RabbitMQ收发消息的内容,发送消息可以在其他类里面或者方法里面,直接通过静态方法进行发送;接收消息,启动了监听,就可以一直存活。如果有兴趣,也可以自己尝试FanoutTopic等不同的模式进行测试,以及可以根据不同的机器,进行配置成收发到不同服务器上面进行通信。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4天前
|
开发框架 .NET 开发者
简化 ASP.NET Core 依赖注入(DI)注册-Scrutor
Scrutor 是一个简化 ASP.NET Core 应用程序中依赖注入(DI)注册过程的开源库,支持自动扫描和注册服务。通过简单的配置,开发者可以轻松地从指定程序集中筛选、注册服务,并设置其生命周期,同时支持服务装饰等高级功能。适用于大型项目,提高代码的可维护性和简洁性。仓库地址:&lt;https://github.com/khellang/Scrutor&gt;
22 5
|
2月前
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
|
6天前
|
消息中间件 存储 JSON
Net使用EasyNetQ简化与RabbitMQ的交互
EasyNetQ是专为.NET环境设计的RabbitMQ客户端API,简化了与RabbitMQ的交互过程。通过NuGet安装EasyNetQ,可轻松实现消息的发布与订阅,支持多种消息模式及高级特性。文中提供了详细的安装步骤、代码示例及基础知识介绍,帮助开发者快速上手。关注公众号“Net分享”获取更多技术文章。
17 1
Net使用EasyNetQ简化与RabbitMQ的交互
|
23天前
|
开发框架 .NET C#
在 ASP.NET Core 中创建 gRPC 客户端和服务器
本文介绍了如何使用 gRPC 框架搭建一个简单的“Hello World”示例。首先创建了一个名为 GrpcDemo 的解决方案,其中包含一个 gRPC 服务端项目 GrpcServer 和一个客户端项目 GrpcClient。服务端通过定义 `greeter.proto` 文件中的服务和消息类型,实现了一个简单的问候服务 `GreeterService`。客户端则通过 gRPC 客户端库连接到服务端并调用其 `SayHello` 方法,展示了 gRPC 在 C# 中的基本使用方法。
35 5
在 ASP.NET Core 中创建 gRPC 客户端和服务器
|
12天前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
22 3
|
3月前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
3月前
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
110 3
|
2月前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ