.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ 工作队列和交换机)--学习笔记

简介: WorkQueuePublish/SubscribeRoutingEmitLog

2.6.4 RabbitMQ -- 工作队列和交换机

  • WorkQueue
  • Publish/Subscribe
  • Routing
  • EmitLog

WorkQueue

WorkQueue:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

  • 一个消息生产者,多个消息消费者
  • exchange 交换机自动恢复
  • 对消息进行持久化
  • 手动确认消息

对消息进行持久化

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

手动确认消息

autoAck: false

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

手动调用 BasicAck

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

修改接收端为手动确认消息

autoAck: false

channel.BasicConsume(queue: "hello",
    autoAck: false,
    consumer: consumer);

BasicAck

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Thread.Sleep(2000);// 演示多个接收端
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine(" [x] Received {0}", message);
};

启动多个接收端

010.jpg

Publish/Subscribe

Publish/Subscribe:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

Fanout 交换机,每个队列都会收到

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

Routing

Routing:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

Bindings

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

Direct exchange

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

EmitLog

新建控制台项目 EmitLogDirect,ReceiveLogsDirect

发送端

namespace EmitLogDirect
{
    class EmitLogDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                    ? string.Join(" ", args.Skip( 1 ).ToArray())
                    : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "direct_logs",
                    routingKey: severity,// 路由 Key 自动带上严重级别
                    basicProperties: null,
                    body: body);
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

011.jpg

error 级别单独发送到一个队列

接收端

namespace ReceiveLogsDirect
{
    class ReceiveLogsDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
                var queueName = channel.QueueDeclare().QueueName;
                if (args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                        Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
                foreach (var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                        exchange: "direct_logs",
                        routingKey: severity);// 路由 Key 自动带上严重级别
                }
                Console.WriteLine(" [*] Waiting for messages.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                        routingKey, message);
                };
                channel.BasicConsume(queue: queueName,
                    autoAck: true,
                    consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

替换发送端,接收端的 localhost 为服务器地址

接收端控制台启动

dotnet run info waring error

发送端控制台启动

dotnet run info
dotnet run error
dotnet run waring test

接收端输出

[x] Received 'info':'Hello World!'
 [x] Received 'error':'Hello World!'
 [x] Received 'waring':'test'

GitHub源码链接:

https://github.com/MINGSON666/Personal-Learning-Library/tree/main/ArchitectTrainingCamp

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
245 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
3月前
|
运维 监控 Cloud Native
从本土到全球,云原生架构护航灵犀互娱游戏出海
本文内容整理自「 2025 中企出海大会·游戏与互娱出海分论坛」,灵犀互娱基础架构负责人朱晓靖的演讲内容,从技术层面分享云原生架构护航灵犀互娱游戏出海经验。
387 16
|
3月前
|
运维 监控 Cloud Native
从本土到全球,云原生架构护航灵犀互娱游戏出海
内容整理自「 2025 中企出海大会·游戏与互娱出海分论坛」,灵犀互娱基础架构负责人朱晓靖的演讲内容,从技术层面分享云原生架构护航灵犀互娱游戏出海经验。
|
1月前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
1月前
|
Java Linux 虚拟化
【Docker】(1)Docker的概述与架构,手把手带你安装Docker,云原生路上不可缺少的一门技术!
1. Docker简介 1.1 Docker是什么 为什么docker会出现? 假定您在开发一款平台项目,您的开发环境具有特定的配置。其他开发人员身处的环境配置也各有不同。 您正在开发的应用依赖于您当前的配置且还要依赖于某些配置文件。 您的企业还拥有标准化的测试和生产环境,且具有自身的配置和一系列支持文件。 **要求:**希望尽可能多在本地模拟这些环境而不产生重新创建服务器环境的开销 问题: 要如何确保应用能够在这些环境中运行和通过质量检测? 在部署过程中不出现令人头疼的版本、配置问题 无需重新编写代码和进行故障修复
296 2
|
1月前
|
人工智能 Kubernetes Cloud Native
Higress(云原生AI网关) 架构学习指南
Higress 架构学习指南 🚀写在前面: 嘿,欢迎你来到 Higress 的学习之旅!
369 0
|
7月前
|
运维 Cloud Native 测试技术
极氪汽车云原生架构落地实践
随着极氪数字业务的飞速发展,背后的 IT 技术也在不断更新迭代。极氪极为重视客户对服务的体验,并将系统稳定性、业务功能的迭代效率、问题的快速定位和解决视为构建核心竞争力的基石。
|
4月前
|
运维 监控 Cloud Native
从“守机器”到“写策略”——云原生架构把运维逼成了架构师
从“守机器”到“写策略”——云原生架构把运维逼成了架构师
91 1
|
4月前
|
缓存 Cloud Native Java
Java 面试微服务架构与云原生技术实操内容及核心考点梳理 Java 面试
本内容涵盖Java面试核心技术实操,包括微服务架构(Spring Cloud Alibaba)、响应式编程(WebFlux)、容器化(Docker+K8s)、函数式编程、多级缓存、分库分表、链路追踪(Skywalking)等大厂高频考点,助你系统提升面试能力。
217 0

热门文章

最新文章