.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)--学习笔记

简介: Consumer 消费者Producer 生产者Request-Response 请求-响应

2.6.7 RabbitMQ -- Masstransit 详解

  • Consumer 消费者
  • Producer 生产者
  • Request-Response 请求-响应

Consumer 消费者

在 MassTransit 中,一个消费者可以消费一种或多种消息

消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

  • Consumer
  • Instance
  • Handler
  • Others

Consumer

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Consumer<SubmitOrderConsumer>();
            });
        });
    }
}

继承 IConsumer,实现 Consume 方法

class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await context.Publish<OrderSubmitted>(new
        {
            context.Message.OrderId
        });
    }
}

三个原则:

  • 拥抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
  • Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
  • Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列

Instance

public class Program
{
    public static async Task Main()
    {
        var submitOrderConsumer = new SubmitOrderConsumer();
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Instance(submitOrderConsumer);
            });
        });
    }
}

所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

Consumer 每次接收到消息都会 new 一个实例

Handler

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Handler<SubmitOrder>(async context =>
                {
                    await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                });
            });
        });
    }
}

通过一个委托 Lambda 方法,来消费消息

Others

  • Saga<>
  • StateMachineSaga<>

Producer 生产者

消息的生产可以通过两种方式产生:发送和发布

发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

基于这两种规则,消息被定义为:命令 command 和事件 event

  • send
  • publish

send

可以调用以下对象的 send 方法来发送 command:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • ISendEndpointProvider(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

ConsumeContext

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;
    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;
    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);
        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
    var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
    await endpoint.Send(new SubmitOrder { OrderId = "123" });
}

publish

  • 发送地址
  • 短地址
  • Convention Map

发送地址

  • rabbitmq://localhost/input-queue
  • rabbitmq://localhost/input-queue?durable=false

短地址

  • GetSendEndpoint(new Uri("queue:input-queue"))

016.jpg

Convention Map

在配置文件中指定 map 规则

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

直接发送

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;
    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;
    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);
        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

可以调用以下对象的 publish 方法来发送 event:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • IPublishEndpoint(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.Publish<OrderSubmitted>(new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
    });
}

Request-Response 请求-响应

Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

  • Consumer
  • IClientFactory
  • IRequestClient
  • Send a request

Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    var order = await _orderRepository.Get(context.Message.OrderId);
    if (order == null)
        throw new InvalidOperationException("Order not found");
    await context.RespondAsync<OrderStatusResult>(new 
    {
        OrderId = order.Id,
        order.Timestamp,
        order.StatusCode,
        order.StatusText
    });
}

需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

IClientFactory

public interface IClientFactory 
{
    IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
    IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
    RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
    RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

IRequestClient

public interface IRequestClient<TRequest>
    where TRequest : class
{
    RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
    Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}

RequestClient 可以创建请求,或者直接获得响应

Send a request

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
存储 开发框架 前端开发
前端框架EXT.NET Dotnet 3.5开发的实验室信息管理系统(LIMS)成品源码 B/S架构
发展历史:实验室信息管理系统(LIMS),就是指通过计算机网络技术对实验的各种信息进行管理的计算机软、硬件系统。也就是将计算机网络技术与现代的管理思想有机结合,利用数据处理技术、海量数据存储技术、宽带传输网络技术、自动化仪器分析技术,来对实验室的信息管理和质量控制等进行全方位管理的计算机软、硬件系统,以满足实验室管理上的各种目标(计划、控制、执行)。
61 1
|
5月前
|
机器学习/深度学习 算法 安全
隐私计算训练营第三讲-详解隐私计算的架构和技术要点
SecretFlow 是一个隐私保护的统一框架,用于数据分析和机器学习,支持MPC、HE、TEE等隐私计算技术。它提供设备抽象、计算图表示和基于图的ML/DL能力,适应数据水平、垂直和混合分割场景。产品层包括SecretPad(快速体验核心能力)和SecretNote(开发工具)。算法层涉及PSI、PIR、数据分析和联邦学习(水平、垂直、混合)。此外,SecretFlow还有YACL密码库和Kusica任务调度框架,Kusica提供轻量化部署、跨域通信和统一API接口。
200 0
|
2月前
|
设计模式 存储 前端开发
揭秘.NET架构设计模式:如何构建坚不可摧的系统?掌握这些,让你的项目无懈可击!
【8月更文挑战第28天】在软件开发中,设计模式是解决常见问题的经典方案,助力构建可维护、可扩展的系统。本文探讨了.NET中三种关键架构设计模式:MVC、依赖注入与仓储模式,并提供了示例代码。MVC通过模型、视图和控制器分离关注点;依赖注入则通过外部管理组件依赖提升复用性和可测性;仓储模式则统一数据访问接口,分离数据逻辑与业务逻辑。掌握这些模式有助于开发者优化系统架构,提升软件质量。
41 5
|
2月前
|
XML 开发框架 .NET
.NET框架:软件开发领域的瑞士军刀,如何让初学者变身代码艺术家——从基础架构到独特优势,一篇不可错过的深度解读。
【8月更文挑战第28天】.NET框架是由微软推出的统一开发平台,支持多种编程语言,简化应用程序的开发与部署。其核心组件包括公共语言运行库(CLR)和类库(FCL)。CLR负责内存管理、线程管理和异常处理等任务,确保代码稳定运行;FCL则提供了丰富的类和接口,涵盖网络、数据访问、安全性等多个领域,提高开发效率。此外,.NET框架还支持跨语言互操作,允许开发者使用C#、VB.NET等语言编写代码并无缝集成。这一框架凭借其强大的功能和广泛的社区支持,已成为软件开发领域的重要工具,适合初学者深入学习以奠定职业生涯基础。
90 1
|
5月前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
240 0
|
5月前
|
架构师 网络协议 算法
Android高级架构师整理面试经历发现?(大厂面经+学习笔记(1)
Android高级架构师整理面试经历发现?(大厂面经+学习笔记(1)
|
5月前
|
SpringCloudAlibaba 负载均衡 Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
136 1
|
5月前
|
SpringCloudAlibaba Java 测试技术
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(六)Hystrix(豪猪哥)的使用
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(六)Hystrix(豪猪哥)的使用
136 1
|
5月前
|
消息中间件 存储 Cloud Native
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」打造新一代云原生"消息、事件、流"统一消息引擎的融合处理平台
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」打造新一代云原生"消息、事件、流"统一消息引擎的融合处理平台
79 0
|
5月前
|
数据安全/隐私保护 Windows
.net三层架构开发步骤
.net三层架构开发步骤
下一篇
无影云桌面