.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ 工作队列和交换机)--学习笔记

简介: WorkQueuePublish/SubscribeRoutingEmitLog

2.6.4 RabbitMQ -- 工作队列和交换机

  • WorkQueue
  • Publish/Subscribe
  • Routing
  • EmitLog

WorkQueue

WorkQueue:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

  • 一个消息生产者,多个消息消费者
  • exchange 交换机自动恢复
  • 对消息进行持久化
  • 手动确认消息

对消息进行持久化

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

手动确认消息

autoAck: false

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

手动调用 BasicAck

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

修改接收端为手动确认消息

autoAck: false

channel.BasicConsume(queue: "hello",
    autoAck: false,
    consumer: consumer);

BasicAck

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Thread.Sleep(2000);// 演示多个接收端
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine(" [x] Received {0}", message);
};

启动多个接收端

010.jpg

Publish/Subscribe

Publish/Subscribe:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

Fanout 交换机,每个队列都会收到

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

Routing

Routing:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

Bindings

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

Direct exchange

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

EmitLog

新建控制台项目 EmitLogDirect,ReceiveLogsDirect

发送端

namespace EmitLogDirect
{
    class EmitLogDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                    ? string.Join(" ", args.Skip( 1 ).ToArray())
                    : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "direct_logs",
                    routingKey: severity,// 路由 Key 自动带上严重级别
                    basicProperties: null,
                    body: body);
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

011.jpg

error 级别单独发送到一个队列

接收端

namespace ReceiveLogsDirect
{
    class ReceiveLogsDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
                var queueName = channel.QueueDeclare().QueueName;
                if (args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                        Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
                foreach (var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                        exchange: "direct_logs",
                        routingKey: severity);// 路由 Key 自动带上严重级别
                }
                Console.WriteLine(" [*] Waiting for messages.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                        routingKey, message);
                };
                channel.BasicConsume(queue: queueName,
                    autoAck: true,
                    consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

替换发送端,接收端的 localhost 为服务器地址

接收端控制台启动

dotnet run info waring error

发送端控制台启动

dotnet run info
dotnet run error
dotnet run waring test

接收端输出

[x] Received 'info':'Hello World!'
 [x] Received 'error':'Hello World!'
 [x] Received 'waring':'test'

GitHub源码链接:

https://github.com/MINGSON666/Personal-Learning-Library/tree/main/ArchitectTrainingCamp

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 存储 缓存
RabbitMQ中的交换机
RabbitMQ中的交换机
70 2
|
7月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
87 2
|
1月前
|
敏捷开发 缓存 中间件
.NET技术的高效开发模式,涵盖面向对象编程、良好架构设计及高效代码编写与管理三大关键要素
本文深入探讨了.NET技术的高效开发模式,涵盖面向对象编程、良好架构设计及高效代码编写与管理三大关键要素,并通过企业级应用和Web应用开发的实践案例,展示了如何在实际项目中应用这些模式,旨在为开发者提供有益的参考和指导。
24 3
|
3月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
291 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
2月前
|
存储 消息中间件 前端开发
.NET常见的几种项目架构模式,你知道几种?
.NET常见的几种项目架构模式,你知道几种?
102 0
|
4月前
|
设计模式 存储 前端开发
揭秘.NET架构设计模式:如何构建坚不可摧的系统?掌握这些,让你的项目无懈可击!
【8月更文挑战第28天】在软件开发中,设计模式是解决常见问题的经典方案,助力构建可维护、可扩展的系统。本文探讨了.NET中三种关键架构设计模式:MVC、依赖注入与仓储模式,并提供了示例代码。MVC通过模型、视图和控制器分离关注点;依赖注入则通过外部管理组件依赖提升复用性和可测性;仓储模式则统一数据访问接口,分离数据逻辑与业务逻辑。掌握这些模式有助于开发者优化系统架构,提升软件质量。
61 5
|
4月前
|
XML 开发框架 .NET
.NET框架:软件开发领域的瑞士军刀,如何让初学者变身代码艺术家——从基础架构到独特优势,一篇不可错过的深度解读。
【8月更文挑战第28天】.NET框架是由微软推出的统一开发平台,支持多种编程语言,简化应用程序的开发与部署。其核心组件包括公共语言运行库(CLR)和类库(FCL)。CLR负责内存管理、线程管理和异常处理等任务,确保代码稳定运行;FCL则提供了丰富的类和接口,涵盖网络、数据访问、安全性等多个领域,提高开发效率。此外,.NET框架还支持跨语言互操作,允许开发者使用C#、VB.NET等语言编写代码并无缝集成。这一框架凭借其强大的功能和广泛的社区支持,已成为软件开发领域的重要工具,适合初学者深入学习以奠定职业生涯基础。
112 1
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
101 0
|
3月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
72 0
|
6月前
|
消息中间件
02.交换机RabbitMQ交换机
02.交换机RabbitMQ交换机
87 0