Net使用EasyNetQ简化与RabbitMQ的交互

本文涉及的产品
交互式建模 PAI-DSW,每月250计算时 3个月
模型训练 PAI-DLC,5000CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
简介: EasyNetQ是专为.NET环境设计的RabbitMQ客户端API,简化了与RabbitMQ的交互过程。通过NuGet安装EasyNetQ,可轻松实现消息的发布与订阅,支持多种消息模式及高级特性。文中提供了详细的安装步骤、代码示例及基础知识介绍,帮助开发者快速上手。关注公众号“Net分享”获取更多技术文章。

Net使用EasyNetQ简化与RabbitMQ的交互

EasyNetQ是一个为.NET环境设计的RabbitMQ客户端API,旨在简化与RabbitMQ的交互。

关于RabbitMq的更多知识点在:https://www.dotnetshare.com

公众号:Net分享,欢迎关注

安装EasyNetQ

你可以通过NuGet包管理器来安装EasyNetQ。在Package Manager Console中运行以下命令:

PM> Install-Package EasyNetQ

这将同时安装EasyNetQ和其依赖的RabbitMQ.Client库。
建议使用DI安装,EasyNetQ.DI.Microsof包含EasyNetQ,同时依赖Newtonsoft.Json

<PackageReference Include="EasyNetQ.DI.Microsoft" Version="7.8.0" /> 
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />

注册连接RabbitMQ

var connectionString = "host=111.111.11.111;virtualHost=/;username=admin;password=123456;timeout=60";
 //链接注册
builder.Services.RegisterEasyNetQ("host=8.153.70.182;virtualHost=/;username=zhaoke;password=123123;publisherConfirms=true");
//发布注册
builder.Services.AddTransient<MQPublish>();
//订阅注册
builder.Services.AddTransient<MQSubscribe>(); 
//添加消息处理
builder.Services.AddHostedService<SubscribeWorker>();

发布消息

EasyNetQ支持发布/订阅模式,你可以通过创建一个.NET类来定义消息,然后使用Publish方法发布消息。例如:

public class TextMessage
{
   
    public string Text {
    get; set; }
}

bus.Publish(new TextMessage {
    Text = "Hello World" });

EasyNetQ会根据消息类型自动创建交换机和队列,并使用Newtonsoft.Json序列化消息为JSON格式。

MQPublish的封装

using EasyNetQ.Topology;
using EasyNetQ;

/// <summary>
/// 发布消息
/// </summary>
public class MQPublish
{
    private readonly IBus bus;

    public MQPublish(IBus bus)
    {
        this.bus = bus;
    }
    /// <summary>
    /// 发布消息
    /// </summary>
    /// <param name="routingKey"></param>
    /// <param name="data"></param>
    public async Task PublishMessageAsync(string routingKey, object data)
    {
        Console.WriteLine($"MQ消息推送,routingKey :{routingKey} , 推送数据 :{System.Text.Json.JsonSerializer.Serialize(data)}");

        var message = new Message<object>(data);
        var advancedBus = bus.Advanced;
        advancedBus.QueueDeclare(routingKey);
        await advancedBus.PublishAsync(Exchange.Default, routingKey, false, message);
    }

    /// <summary>
    /// 发布延迟消息
    /// </summary>
    /// <param name="routingKey"></param>
    /// <param name="data"></param>
    /// <param name="timeout">毫秒</param>
    public void PublishDelayMessage(string routingKey, object data, int timeout)
    {
        var advancedBus = bus.Advanced;
        var message = new Message<object>(data);
        var properties = new MessageProperties();
        properties.Headers.Add("x-delay", timeout);
        var messageData = new Message<object>(message, properties);
        // 建立延时 exchange
        var exDelay = advancedBus.ExchangeDeclare($"{routingKey}_delay_exchange", cfg => cfg.AsDelayedExchange(ExchangeType.Direct));
        // 申明队列
        var qNormal = advancedBus.QueueDeclare($"{routingKey}_delay_queue");
        // 绑定,设置好 RoutingKey
        advancedBus.Bind(exDelay, qNormal, routingKey);
        bus.Advanced.Publish(exDelay, routingKey, false, messageData);
    }
}

订阅消息

订阅消息时,你需要指定一个订阅ID和一个处理消息的委托。例如:

bus.Subscribe<TextMessage>("subscriptionId", message =>
{
   
    Console.WriteLine("Received message: " + message.Text);
});

这样,当有消息发布到对应的交换机和队列时,你的订阅就会收到消息。

封装MQSubscribe

public class MQSubscribe
{
    // MQ消息总线
    private readonly IBus bus;
    public MQSubscribe(IBus bus)
    {
        this.bus = bus;
    }

    /// <summary>
    /// 处理消息的总入口
    /// </summary>
    /// <returns></returns>
    public Task Init()
    { 
        SubscribeTSysLogVis();
        //程序不结束,等待输入
        Console.WriteLine($"已启动(处理消息) {DateTime.UtcNow}");

        return Task.CompletedTask;
    }

    private Task SubscribeTSysLogVis()
    {
        var advancedBus = bus.Advanced;
        //订阅TSysLogVis日志 - 请不要在两次发布之间重复使用它
        var queue = advancedBus.QueueDeclare("TSysLogVis");

        advancedBus.Consume(queue, async (body, properties, info) =>
        {
            try
            {
                var message = Encoding.UTF8.GetString(body.ToArray());
                //var data = JsonConvert.DeserializeObject<TSysLogVis>(message);
                Console.WriteLine($"消息处理 {DateTime.Now} : {message}");
                //db.Insertable(data).SplitTable().ExecuteReturnSnowflakeId();
            }
            catch (Exception ex)
            {
                // 处理异常,例如记录日志或重新抛出
                Console.Error.WriteLine($"处理消息时发生异常: {ex}");
            }
        }); 
        return Task.CompletedTask;
    }
}

SubscribeWorker

启用订阅服务即可

    public class SubscribeWorker : BackgroundService
    {
   
        private readonly MQSubscribe _Service;

        public SubscribeWorker(MQSubscribe  Service)
        {
   
            _Service = Service;
        }

        // 执行逻辑
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
   
            await _Service.Init(); 
        }
    }

以上是EasyNetQ的基本使用方法,它还支持请求/响应模式和发送/接收模式,以及其他高级功能。你可以通过阅读官方文档来获取更多信息。

基础知识

  • 消息队列(Message Queue):Rabbit 是基于消息队列的中间件,它允许应用程序通过发送和接收消息来进行异步通信。

  • 生产者(Producer)和消费者(Consumer):Rabbit 中的应用程序可以充当生产者和消费者的角色。生产者负责将消息发送到队列中,消费者则从队列中接收并处理消息。

  • 队列(Queue):Rabbit 使用队列来存储消息。生产者将消息发送到队列中,而消费者从队列中接收消息进行处理。队列可以确保消息的顺序性和可靠性。

  • 交换器(Exchange):交换器负责接收生产者发送的消息,并根据一定的规则将消息路由到特定的队列中。Rabbit 提供了不同类型的交换器,如直连交换器(Direct Exchange)、主题交换器(Topic Exchange)等。

  • 绑定(Binding):绑定将队列与交换器进行关联,定义了消息从交换器路由到队列的规则。一个队列可以绑定到多个交换器上,一个交换器也可以将消息路由到多个队列上。

  • 路由键(Routing Key):生产者在发送消息时需要指定一个路由键,交换器根据路由键来判断将消息路由到哪些队列。不同类型的交换器对路由键的处理方式有所不同。

  • 持久化(Durability):Rabbit 支持消息的持久化,即将消息存储到磁盘中以防止消息丢失。可以将队列、交换器和消息设置为持久化。

  • 可靠性投递(Reliable Delivery):Rabbit 提供了消息可靠性投递的机制,确保消息能够被消费者正确地接收和处理。包括消息的确认机制、消息的重试、消息的死信队列等。

  • 发布/订阅(Publish/Subscribe)模式:Rabbit 支持发布/订阅模式,其中一个生产者可以将消息同时发送给多个消费者,每个消费者都会收到相同的消息副本。

  • ACK 机制:消费者收到消息后需要发送 ACK(确认)给 Rabbit 服务器,告知服务器消息已经被成功接收和处理,服务器可以将消息从队列中删除。

欢迎关注我的公众号“Net分享”,技术文章第一时间推送,随缘更新 , 分享一些你可能注意不到的细节。



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
移动开发 网络协议 NoSQL
.NET Core WebSocket实现简易、高性能、集群即时通讯组件
.NET Core WebSocket实现简易、高性能、集群即时通讯组件
346 0
|
监控 物联网 API
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。
1286 0
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
|
24天前
|
网络协议 网络安全 Apache
一个整合性、功能丰富的.NET网络通信框架
一个整合性、功能丰富的.NET网络通信框架
|
消息中间件 存储 Docker
.Net Core对于`RabbitMQ`封装分布式事件总线
.Net Core对于`RabbitMQ`封装分布式事件总线
228 1
.Net Core对于`RabbitMQ`封装分布式事件总线
.NET Core - 配置框架:让服务无缝适应各种环境
.NET Core - 配置框架:让服务无缝适应各种环境
|
消息中间件 监控
十五、.net core(.NET 6)搭建RabbitMQ消息队列生产者和消费者的简单方法
搭建RabbitMQ简单通用的直连方法 如果还没有MQ环境,可以参考上一篇的博客: https://www.cnblogs.com/weskynet/p/14877932.html
746 0
十五、.net core(.NET 6)搭建RabbitMQ消息队列生产者和消费者的简单方法
|
开发框架 负载均衡 网络协议
.NET WebSocket 核心原理初体验
本文将利用WebSockets(SignalR的一部分)搭建一个可双向通信的ASP.NETCore5应用。
.NET WebSocket 核心原理初体验
|
负载均衡 Cloud Native Java
.NET gRPC核心功能初体验
gRPC是高性能的RPC框架, 有效地用于服务通信(不管是数据中心内部还是跨数据中心)。
.NET gRPC核心功能初体验
|
消息中间件
一起谈.NET技术,NET下RabbitMQ实践 [WCF发布篇]
  在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写。今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布。  注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵。
1193 0

热门文章

最新文章