.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)--学习笔记

简介: Consumer 消费者Producer 生产者Request-Response 请求-响应

2.6.7 RabbitMQ -- Masstransit 详解

  • Consumer 消费者
  • Producer 生产者
  • Request-Response 请求-响应

Consumer 消费者

在 MassTransit 中,一个消费者可以消费一种或多种消息

消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

  • Consumer
  • Instance
  • Handler
  • Others

Consumer

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Consumer<SubmitOrderConsumer>();
            });
        });
    }
}

继承 IConsumer,实现 Consume 方法

class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await context.Publish<OrderSubmitted>(new
        {
            context.Message.OrderId
        });
    }
}

三个原则:

  • 拥抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
  • Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
  • Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列

Instance

public class Program
{
    public static async Task Main()
    {
        var submitOrderConsumer = new SubmitOrderConsumer();
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Instance(submitOrderConsumer);
            });
        });
    }
}

所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

Consumer 每次接收到消息都会 new 一个实例

Handler

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Handler<SubmitOrder>(async context =>
                {
                    await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                });
            });
        });
    }
}

通过一个委托 Lambda 方法,来消费消息

Others

  • Saga<>
  • StateMachineSaga<>

Producer 生产者

消息的生产可以通过两种方式产生:发送和发布

发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

基于这两种规则,消息被定义为:命令 command 和事件 event

  • send
  • publish

send

可以调用以下对象的 send 方法来发送 command:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • ISendEndpointProvider(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

ConsumeContext

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;
    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;
    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);
        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
    var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
    await endpoint.Send(new SubmitOrder { OrderId = "123" });
}

publish

  • 发送地址
  • 短地址
  • Convention Map

发送地址

  • rabbitmq://localhost/input-queue
  • rabbitmq://localhost/input-queue?durable=false

短地址

  • GetSendEndpoint(new Uri("queue:input-queue"))

016.jpg

Convention Map

在配置文件中指定 map 规则

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

直接发送

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;
    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;
    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);
        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

可以调用以下对象的 publish 方法来发送 event:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • IPublishEndpoint(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.Publish<OrderSubmitted>(new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
    });
}

Request-Response 请求-响应

Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

  • Consumer
  • IClientFactory
  • IRequestClient
  • Send a request

Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    var order = await _orderRepository.Get(context.Message.OrderId);
    if (order == null)
        throw new InvalidOperationException("Order not found");
    await context.RespondAsync<OrderStatusResult>(new 
    {
        OrderId = order.Id,
        order.Timestamp,
        order.StatusCode,
        order.StatusText
    });
}

需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

IClientFactory

public interface IClientFactory 
{
    IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
    IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
    RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
    RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

IRequestClient

public interface IRequestClient<TRequest>
    where TRequest : class
{
    RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
    Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}

RequestClient 可以创建请求,或者直接获得响应

Send a request

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4天前
|
Cloud Native 持续交付 云计算
云原生技术:重塑软件开发与架构的未来
在云计算的推动下,云原生技术正逐渐成为软件开发的新标准,强调利用容器、服务网格、微服务等技术实现敏捷开发与高效运维。本文探讨了云原生技术如何重塑软件开发与架构的未来,介绍了其核心概念(如容器化、微服务架构、CI/CD)及优势(如敏捷性、可扩展性、成本效益),并讨论了其在金融服务、电子商务和物联网等领域的实际应用及面临的挑战。尽管存在技术复杂性和人才短缺等问题,云原生技术仍将成为软件开发的主流趋势。
|
1天前
|
Kubernetes Cloud Native 持续交付
深入理解云原生技术及其在现代IT架构中的应用
【9月更文挑战第18天】云原生技术,作为推动企业数字化转型的引擎,正以它独特的魅力重塑着信息技术的未来。本文将带你一探究竟,从云原生的基础概念出发,逐步深入到其核心组件、设计理念以及如何在实际应用中发挥巨大作用。你将了解到容器化、微服务架构、持续集成与持续部署(CI/CD)等关键实践,并见证它们如何帮助企业构建更加灵活、高效和可靠的应用。
|
1天前
|
运维 Kubernetes Cloud Native
探索云原生技术:容器化与微服务架构的融合之道
【9月更文挑战第18天】在数字化转型的浪潮中,云原生技术以其灵活性、可扩展性成为企业创新的强大引擎。本文将深入探讨云原生技术的核心概念,特别是容器化和微服务架构如何相辅相成,共同推动现代应用的开发与部署。通过实际代码示例,我们将揭示这些技术如何简化运维,加速产品上市时间,并提高系统的可靠性和弹性。无论你是开发人员、架构师还是IT决策者,这篇文章都将为你提供宝贵的洞见和实践指导。
11 2
|
3天前
|
运维 Cloud Native 持续交付
云原生之旅:从容器化到微服务架构的探索
【9月更文挑战第16天】在数字化转型的浪潮中,云原生技术成为推动企业创新和效率提升的关键力量。本文将带你深入了解云原生的核心理念,从容器化技术的入门应用到微服务架构的设计实践,揭示如何利用这些先进技术构建更灵活、更可靠的系统。我们将通过具体案例,探讨云原生技术如何帮助企业实现快速迭代与持续交付,以及在这一过程中可能遇到的挑战和解决方案。
|
1天前
|
Kubernetes Cloud Native Java
云原生技术之旅:从容器化到微服务架构
【9月更文挑战第18天】云原生技术正改变着我们构建、部署和管理应用的方式。本文将通过一次虚拟的旅行,带领读者探索云原生的核心概念,如容器化、微服务、持续集成与交付等。我们将以一个实际案例为线索,逐步展开对Kubernetes集群管理、Docker容器创建和Spring Boot微服务开发的讨论。就像在旅途中不断发现新风景一样,您将了解到这些技术如何协同工作,提升开发效率和应用性能。准备好了吗?让我们启航!
|
2天前
|
运维 Cloud Native Devops
云原生技术:重塑现代IT架构的新引擎
在当今数字化转型的浪潮中,云原生技术以其敏捷、高效和可扩展的特性,正引领着一场IT架构的革命。本文旨在深入探讨云原生的概念、核心组件及其在现代企业中的应用价值,揭示其如何助力企业实现更快的创新速度、更高的资源利用率以及更优的用户体验。不同于传统的云计算模式,云原生从一开始就为云环境量身打造,通过容器化、微服务、DevOps等关键技术,解锁了软件开发和运维的新范式。
|
4天前
|
运维 Cloud Native 持续交付
探索云原生架构:企业转型的未来之路
在当今这个数据驱动的时代,企业面临着前所未有的挑战与机遇。随着云计算技术的日益成熟,云原生作为一种新兴的架构理念,正逐步成为推动企业数字化转型的关键力量。本文旨在深入探讨云原生的概念、特点及其对企业转型的重要意义,并通过实例分析展示其在实际应用中的巨大潜力和价值。
|
7天前
|
运维 Cloud Native Devops
探索云原生架构:企业数字化转型的新引擎
在当今数字化浪潮中,云原生架构以其敏捷性、弹性和高可用性成为企业实现高效上云和加速创新的关键。本文将深入探讨云原生的核心理念、关键技术如容器化、微服务和DevOps实践,以及这些技术如何共同推动企业在云平台上的灵活部署和快速迭代。同时,文章还将分析成功案例,展示云原生如何帮助企业构建现代化应用,提高资源利用率,并确保系统稳定运行。通过综合运用这些先进技术策略,企业能够有效应对市场变化,缩短产品上市时间,从而在激烈的市场竞争中脱颖而出。
|
12天前
|
运维 Cloud Native 云计算
云原生之旅:从容器化到微服务架构的演进之路
在数字化浪潮中,云原生技术如同星辰大海中的灯塔,为航船指引方向。本文将带你穿梭于云计算的世界,探索从容器化技术到微服务架构的变革旅程。我们将一窥云原生如何助力企业灵活应对快速变化的市场需求,以及在这一过程中,开发者和运维人员是如何成为时代变革的弄潮儿。让我们一同启航,驶向云原生的广阔天地。
|
7天前
|
Cloud Native Devops 持续交付
探索云原生架构:构建高效、灵活和可扩展的系统
本文将深入探讨云原生架构的核心概念、主要技术以及其带来的优势。我们将从云原生的定义开始,了解其设计理念和技术原则;接着分析容器化、微服务等关键技术在云原生中的应用;最后总结云原生架构如何助力企业实现数字化转型,提升业务敏捷性和创新能力。通过这篇文章,读者可以全面了解云原生架构的价值和应用前景。

热门文章

最新文章