.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ 工作队列和交换机)--学习笔记

简介: WorkQueuePublish/SubscribeRoutingEmitLog

2.6.4 RabbitMQ -- 工作队列和交换机

  • WorkQueue
  • Publish/Subscribe
  • Routing
  • EmitLog

WorkQueue

WorkQueue:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

  • 一个消息生产者,多个消息消费者
  • exchange 交换机自动恢复
  • 对消息进行持久化
  • 手动确认消息

对消息进行持久化

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

手动确认消息

autoAck: false

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

手动调用 BasicAck

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

修改接收端为手动确认消息

autoAck: false

channel.BasicConsume(queue: "hello",
    autoAck: false,
    consumer: consumer);

BasicAck

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Thread.Sleep(2000);// 演示多个接收端
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine(" [x] Received {0}", message);
};

启动多个接收端

010.jpg

Publish/Subscribe

Publish/Subscribe:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

Fanout 交换机,每个队列都会收到

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

Routing

Routing:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

Bindings

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

Direct exchange

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

EmitLog

新建控制台项目 EmitLogDirect,ReceiveLogsDirect

发送端

namespace EmitLogDirect
{
    class EmitLogDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                    ? string.Join(" ", args.Skip( 1 ).ToArray())
                    : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "direct_logs",
                    routingKey: severity,// 路由 Key 自动带上严重级别
                    basicProperties: null,
                    body: body);
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

011.jpg

error 级别单独发送到一个队列

接收端

namespace ReceiveLogsDirect
{
    class ReceiveLogsDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机
                var queueName = channel.QueueDeclare().QueueName;
                if (args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                        Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
                foreach (var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                        exchange: "direct_logs",
                        routingKey: severity);// 路由 Key 自动带上严重级别
                }
                Console.WriteLine(" [*] Waiting for messages.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                        routingKey, message);
                };
                channel.BasicConsume(queue: queueName,
                    autoAck: true,
                    consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

替换发送端,接收端的 localhost 为服务器地址

接收端控制台启动

dotnet run info waring error

发送端控制台启动

dotnet run info
dotnet run error
dotnet run waring test

接收端输出

[x] Received 'info':'Hello World!'
 [x] Received 'error':'Hello World!'
 [x] Received 'waring':'test'

GitHub源码链接:

https://github.com/MINGSON666/Personal-Learning-Library/tree/main/ArchitectTrainingCamp

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
运维 Cloud Native 测试技术
极氪汽车云原生架构落地实践
随着极氪数字业务的飞速发展,背后的 IT 技术也在不断更新迭代。极氪极为重视客户对服务的体验,并将系统稳定性、业务功能的迭代效率、问题的快速定位和解决视为构建核心竞争力的基石。
|
9月前
|
运维 Cloud Native 持续交付
深入理解云原生架构及其在现代企业中的应用
随着数字化转型的浪潮席卷全球,企业正面临着前所未有的挑战与机遇。云计算技术的迅猛发展,特别是云原生架构的兴起,正在重塑企业的IT基础设施和软件开发模式。本文将深入探讨云原生的核心概念、关键技术以及如何在企业中实施云原生策略,以实现更高效的资源利用和更快的市场响应速度。通过分析云原生架构的优势和面临的挑战,我们将揭示它如何助力企业在激烈的市场竞争中保持领先地位。
208 13
|
4月前
|
人工智能 Cloud Native 容灾
深圳农商银行三代核心系统全面投产 以云原生架构筑牢数字化转型基石
深圳农商银行完成第三代核心系统全面上云,日均交易超3000万笔,峰值处理效率提升2倍以上。扎根深圳70余年,与阿里云共建“两地三中心”分布式云平台,实现高可用体系及全栈护航。此次云原生转型为行业提供可复制样本,未来将深化云计算与AI合作,推动普惠金融服务升级。
351 17
|
4月前
|
存储 Cloud Native 关系型数据库
PolarDB开源:云原生数据库的架构革命
本文围绕开源核心价值、社区运营实践和技术演进路线展开。首先解读存算分离架构的三大突破,包括基于RDMA的分布式存储、计算节点扩展及存储池扩容机制,并强调与MySQL的高兼容性。其次分享阿里巴巴开源治理模式,涵盖技术决策、版本发布和贡献者成长体系,同时展示企业应用案例。最后展望技术路线图,如3.0版本的多写多读架构、智能调优引擎等特性,以及开发者生态建设举措,推荐使用PolarDB-Operator实现高效部署。
253 3
|
9月前
|
Kubernetes Cloud Native 微服务
探索云原生技术:容器化与微服务架构的融合之旅
本文将带领读者深入了解云原生技术的核心概念,特别是容器化和微服务架构如何相辅相成,共同构建现代软件系统。我们将通过实际代码示例,探讨如何在云平台上部署和管理微服务,以及如何使用容器编排工具来自动化这一过程。文章旨在为开发者和技术决策者提供实用的指导,帮助他们在云原生时代中更好地设计、部署和维护应用。
|
10月前
|
Cloud Native Devops 云计算
云计算的未来:云原生架构与微服务的革命####
【10月更文挑战第21天】 随着企业数字化转型的加速,云原生技术正迅速成为IT行业的新宠。本文深入探讨了云原生架构的核心理念、关键技术如容器化和微服务的优势,以及如何通过这些技术实现高效、灵活且可扩展的现代应用开发。我们将揭示云原生如何重塑软件开发流程,提升业务敏捷性,并探索其对企业IT架构的深远影响。 ####
237 3
|
5月前
|
Cloud Native Serverless 流计算
云原生时代的应用架构演进:从微服务到 Serverless 的阿里云实践
云原生技术正重塑企业数字化转型路径。阿里云作为亚太领先云服务商,提供完整云原生产品矩阵:容器服务ACK优化启动速度与镜像分发效率;MSE微服务引擎保障高可用性;ASM服务网格降低资源消耗;函数计算FC突破冷启动瓶颈;SAE重新定义PaaS边界;PolarDB数据库实现存储计算分离;DataWorks简化数据湖构建;Flink实时计算助力风控系统。这些技术已在多行业落地,推动效率提升与商业模式创新,助力企业在数字化浪潮中占据先机。
329 12
|
8月前
|
网络协议 安全 网络性能优化
三层24千兆+4万兆光电可选网管型嵌入式交换机核心模块SW-24G4F-301EM
SW-24G4F-301EM是一款高性能管理型交换机核心模块,支持24个千兆端口和4个万兆端口,具备L3路由、VLAN、QoS、ACL等丰富功能。该模块采用DIP-110封装,尺寸为77.5x76.0x8.0mm,支持12V供电,功耗仅6W,适用于煤矿、轨交、工业控制、教学设备等行业。模块提供2路QSGMII、2路XSGMII及4路10GBase-R接口,具备强大的数据处理能力和灵活的接口配置,便于嵌入式集成。
|
9月前
|
弹性计算 运维 Cloud Native
云原生架构的崛起与未来展望
在数字化转型的浪潮中,云原生架构凭借其高效、灵活和可扩展的特性,正逐渐成为企业IT战略的核心。本文旨在探讨云原生架构的定义、关键特性、实施优势以及面临的挑战,同时展望未来的发展趋势。通过深入分析,我们期望为读者提供一个关于云原生架构全面而深入的视角,助力企业在云计算时代做出更明智的决策。
196 30

热门文章

最新文章