.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)--学习笔记

简介: Consumer 消费者Producer 生产者Request-Response 请求-响应

2.6.7 RabbitMQ -- Masstransit 详解

  • Consumer 消费者
  • Producer 生产者
  • Request-Response 请求-响应

Consumer 消费者

在 MassTransit 中,一个消费者可以消费一种或多种消息

消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

  • Consumer
  • Instance
  • Handler
  • Others

Consumer

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Consumer<SubmitOrderConsumer>();
            });
        });
    }
}

继承 IConsumer,实现 Consume 方法

class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await context.Publish<OrderSubmitted>(new
        {
            context.Message.OrderId
        });
    }
}

三个原则:

  • 拥抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
  • Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
  • Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列

Instance

public class Program
{
    public static async Task Main()
    {
        var submitOrderConsumer = new SubmitOrderConsumer();
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Instance(submitOrderConsumer);
            });
        });
    }
}

所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

Consumer 每次接收到消息都会 new 一个实例

Handler

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Handler<SubmitOrder>(async context =>
                {
                    await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                });
            });
        });
    }
}

通过一个委托 Lambda 方法,来消费消息

Others

  • Saga<>
  • StateMachineSaga<>

Producer 生产者

消息的生产可以通过两种方式产生:发送和发布

发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

基于这两种规则,消息被定义为:命令 command 和事件 event

  • send
  • publish

send

可以调用以下对象的 send 方法来发送 command:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • ISendEndpointProvider(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

ConsumeContext

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;
    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;
    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);
        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
    var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
    await endpoint.Send(new SubmitOrder { OrderId = "123" });
}

publish

  • 发送地址
  • 短地址
  • Convention Map

发送地址

  • rabbitmq://localhost/input-queue
  • rabbitmq://localhost/input-queue?durable=false

短地址

  • GetSendEndpoint(new Uri("queue:input-queue"))

016.jpg

Convention Map

在配置文件中指定 map 规则

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

直接发送

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;
    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;
    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);
        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

可以调用以下对象的 publish 方法来发送 event:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • IPublishEndpoint(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.Publish<OrderSubmitted>(new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
    });
}

Request-Response 请求-响应

Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

  • Consumer
  • IClientFactory
  • IRequestClient
  • Send a request

Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    var order = await _orderRepository.Get(context.Message.OrderId);
    if (order == null)
        throw new InvalidOperationException("Order not found");
    await context.RespondAsync<OrderStatusResult>(new 
    {
        OrderId = order.Id,
        order.Timestamp,
        order.StatusCode,
        order.StatusText
    });
}

需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

IClientFactory

public interface IClientFactory 
{
    IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
    IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
    RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
    RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

IRequestClient

public interface IRequestClient<TRequest>
    where TRequest : class
{
    RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
    Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}

RequestClient 可以创建请求,或者直接获得响应

Send a request

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
21天前
|
运维 Cloud Native 持续交付
深入理解云原生架构及其在现代企业中的应用
随着数字化转型的浪潮席卷全球,企业正面临着前所未有的挑战与机遇。云计算技术的迅猛发展,特别是云原生架构的兴起,正在重塑企业的IT基础设施和软件开发模式。本文将深入探讨云原生的核心概念、关键技术以及如何在企业中实施云原生策略,以实现更高效的资源利用和更快的市场响应速度。通过分析云原生架构的优势和面临的挑战,我们将揭示它如何助力企业在激烈的市场竞争中保持领先地位。
|
19天前
|
Kubernetes Cloud Native 微服务
探索云原生技术:容器化与微服务架构的融合之旅
本文将带领读者深入了解云原生技术的核心概念,特别是容器化和微服务架构如何相辅相成,共同构建现代软件系统。我们将通过实际代码示例,探讨如何在云平台上部署和管理微服务,以及如何使用容器编排工具来自动化这一过程。文章旨在为开发者和技术决策者提供实用的指导,帮助他们在云原生时代中更好地设计、部署和维护应用。
|
1月前
|
Kubernetes Cloud Native 安全
云原生架构的演进与实践
随着云计算技术的不断发展,云原生架构已成为现代软件开发的核心趋势。本文旨在探讨云原生架构的演变历程、核心理念及在实际项目中的应用案例。通过对Kubernetes、Docker等关键技术的分析,结合微服务架构的设计原则,本文将揭示如何构建高效、可扩展且易于维护的云原生应用。
50 10
|
18天前
|
运维 Cloud Native 持续交付
云原生技术深度探索:重塑现代IT架构的无形之力####
本文深入剖析了云原生技术的核心概念、关键技术组件及其对现代IT架构变革的深远影响。通过实例解析,揭示云原生如何促进企业实现敏捷开发、弹性伸缩与成本优化,为数字化转型提供强有力的技术支撑。不同于传统综述,本摘要直接聚焦于云原生技术的价值本质,旨在为读者构建一个宏观且具体的技术蓝图。 ####
|
1月前
|
Kubernetes Cloud Native Docker
云原生之旅:从传统架构到容器化服务的演变
随着技术的快速发展,云计算已经从简单的虚拟化服务演进到了更加灵活和高效的云原生时代。本文将带你了解云原生的概念、优势以及如何通过容器化技术实现应用的快速部署和扩展。我们将以一个简单的Python Web应用为例,展示如何利用Docker容器进行打包和部署,进而探索Kubernetes如何管理这些容器,确保服务的高可用性和弹性伸缩。
|
25天前
|
Cloud Native 持续交付 云计算
云原生技术在现代IT架构中的转型力量####
本文深入剖析了云原生技术的精髓,探讨其在现代IT架构转型中的关键作用与实践路径。通过具体案例分析,展示了云原生如何赋能企业实现更高效的资源利用、更快的迭代速度以及更强的系统稳定性,为读者提供了一套可借鉴的实施框架与策略。 ####
24 0
|
28天前
|
消息中间件 运维 Cloud Native
云原生架构下的微服务优化策略####
本文深入探讨了云原生环境下微服务架构的优化路径,针对服务拆分、通信效率、资源管理及自动化运维等核心环节提出了具体的优化策略。通过案例分析与最佳实践分享,旨在为开发者提供一套系统性的解决方案,以应对日益复杂的业务需求和快速变化的技术挑战,助力企业在云端实现更高效、更稳定的服务部署与运营。 ####
|
1月前
|
Cloud Native 持续交付 云计算
深入理解云原生技术及其在现代IT架构中的应用
在数字化浪潮的推动下,云原生技术已成为企业转型的关键。本文将通过浅显易懂的语言和生动的比喻,带领读者探索云原生的核心概念、优势以及如何在企业中实现云原生架构。我们将一起揭开云原生的神秘面纱,了解它如何助力企业快速适应市场变化,提升业务的灵活性和创新能力。
|
1月前
|
Cloud Native Devops 持续交付
云原生架构的演进与实践
本文深入探讨了云原生架构的核心概念、技术组件及其在现代软件开发中的应用。通过分析容器化、微服务、持续集成/持续部署(CI/CD)等关键技术,揭示了这些技术如何共同促进应用程序的灵活性、可扩展性和高可用性。文章还讨论了云原生架构实施过程中面临的挑战和最佳实践,旨在为开发者和企业提供一套实用的指导方针,以便更有效地利用云计算资源,加速数字化转型的步伐。
45 5
|
1月前
|
消息中间件 监控 Cloud Native
云原生架构下的数据一致性挑战与解决方案####
在数字化转型加速的今天,云原生架构以其轻量级、弹性伸缩和高可用性成为企业IT架构的首选。然而,在享受其带来的灵活性的同时,数据一致性问题成为了不可忽视的挑战。本文探讨了云原生环境中数据一致性的复杂性,分析了导致数据不一致的根本原因,并提出了几种有效的解决策略,旨在为开发者和企业提供实践指南,确保在动态变化的云环境中保持数据的完整性和准确性。 ####