译MassTransit 创建消息消费者

简介: 创建消息消费者一个消息消费者是一个 可以消费一个或多个消息类型的类,指定IConsumer接口,T为消息类型 public class UpdateCustomerConsumer : IConsumer { public async Task Consume(ConsumeContext context) { await Console.

创建消息消费者
一个消息消费者是一个 可以消费一个或多个消息类型的类,指定IConsumer<T>接口,T为消息类型

public class UpdateCustomerConsumer :
    IConsumer<UpdateCustomerAddress>
{
    public async Task Consume(ConsumeContext<UpdateCustomerAddress> context)
    {
        await Console.Out.WriteLineAsync($"Updating customer: {context.Message.CustomerId}");

        // update the customer address
    }
}

当消费者订阅接收端点时,由端点接收消费者所消费的消息。创建一个消费者实例(using a consumer factory, which is covered(覆盖) --> here <--)然后,通过Consume方法将消息(包裹在 ConsumeContext)传递给消费者。

Consume方法是异步的,并返回一个task。。MassTransit 等待该任务, 在此期间消息对其他接收端点不可用。如果consume 方法成功完成 (RanToCompletion 的task状态),则消息将被确认并从队列中删除。

注意:如果消费者错误(例如抛出异常,导致Faulted的任务状态),或者以某种方式被取消cancelled (被取消的Canceled任务状态),则异常被传播回管道,在那里它可以最终被重试或移动到错误队列。

连接消息消费者

对于消费者接收消息,消费者必须连接到接收端点。这是在总线配置期间进行的,尤其是在接收端点的配置中。

下面显示了将消费者连接到接收端点的示例。 

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });

    cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
    {
        e.Consumer<UpdateCustomerConsumer>();
    });
});

该示例创建一个总线,该总线使用使用默认的用户名和密码(Guest/Guest)连接到在本地机器上运行的RabbitMQ,.在那个总线上.使用命名customer_update_queue创建单个接收端点。使用最简单的方法连接消费者,该方法接受具有默认构造函数的消费者类。

注意:当一个消费者连接到一个接收端点时,由连接到同一个接收端点的所有消费者所消费的组合消息类型被*subscribed*订阅到队列中。(in the case of RabbitMQ exchange bindings are created for the message types to the exchange/queue for the receive endpoint.)订阅方法因broker代理而异,在RabbitMQ Exchange绑定的情况下,将消息类型创建为接收端点的Exchange /queue。这些订阅是持久的,并在进程退出后保持不变。这样可以确保发布或发送的消息交付到接收端点消费者之一,即使进程终止。当进程启动时,队列中等待的消息将交付给消费者。

上面的示例使用默认构造函数消费者工厂来连接消费者。有几个其他的消费工厂支持,如下所示。

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = ...;

    cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
    {
        // an anonymous factory method
        e.Consumer(() => new YourConsumer());

        // an existing consumer factory for the consumer type
        e.Consumer(consumerFactory);

        // a type-based factory that returns an object (container friendly)
        e.Consumer(consumerType, type => container.Resolve(type));

        // an anonymous factory method, with some middleware goodness
        e.Consumer(() => new YourConsumer(), x =>
        {
            // add middleware to the consumer pipeline
            x.UseLog(ConsoleOut, async context => "Consumer created");
        });
    });
});

 连接到现有总线
一旦创建了总线,就会创建接收端点,无法修改。然而,总线本身提供了一个临时的(自动删除)队列,可以用来接收消息。为了将消费者连接到总线临时队列,可以使用一系列连接方法。

警告:临时队列将不接收已发布的消息。由于队列是临时的,当消费者连接时,不会创建绑定或订阅。这使得它对于临时消费者非常快,并且避免用临时绑定来击乱消息代理。


临时队列对于接收请求响应和故障(via the response/fault address header)和路由滑移事件(via an event subscription in the routing slip)非常有用。
 

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = ...;
});

busControl.Start();

ConnectHandle handle = busControl.ConnectConsumer<FaultConsumer>();
...
handle.Disconnect(); // disconnect the consumer from the bus pipeline

除了 ConnectConsumer 方法之外, 还包括每个消费者类型的方法 (ConnectHandler、ConnectInstance、ConnectSaga 和 ConnectStateMachineSaga)。

连接现有的消费者实例
虽然强烈建议使用每个消息的消费者实例,但可以连接一个现有的消费者实例,每个实例都将调用该实例。消费者必须是线程安全的,因为消费方法将同时从多个线程调用。若要连接现有实例,请参见下面的示例。 

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = ...;

    cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
    {
        e.Instance(existingConsumer);
    });
});

 处理无法传递的消息

如果端点的配置改变,或者如果消息被错误地发送到端点,则可能接收到没有任何连接的消费者的消息类型。如果发生这种情况,则将消息移动到_skipped队列(由原始队列名称前缀)。保留原始消息内容,并添加附加标题来指示移动消息的主机。

处理没有消费者的消息

虽然创建消费者是首选的消息消费方式,但也有可能创建一个简单的消息处理程序。通过指定方法、匿名方法或lambda方法,消息可以在接收端点上被消耗。若配置简单的消息处理程序, 请参阅下面的示例。

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = ...;

    cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
    {
        e.Handler<UpdateCustomerAddress>(context =>
            return Console.Out.WriteLineAsync($"Update customer address received: {context.Message.CustomerId}"));
    });
});

在这种情况下,对接收到的每个消息都调用该方法。没有创建消费者,也不执行生命周期管理。

通过 IObserver 观察消息
通过添加IObserver 接口,将观察者的概念添加到.NET Framework中。MassTransit 支持观察者直接连接接收端点。

可惜,观察者不是异步的。因此,当使用观测者时,无法对编译程序提供的异步支持进行良好的运行。

一个观察者使用内置的IObserver<T>接扣定义,如下所示。 

public class CustomerAddressUpdatedObserver :
    IObserver<ConsumeContext<CustomerAddressUpdated>>
{
    public void OnNext(ConsumeContext<CustomerAddressUpdated> context)
    {
        Console.WriteLine("Customer address was updated: {0}", context.Message.CustomerId);
    }

    public void OnError(Exception error)
    {
    }

    public void OnCompleted()
    {
    }
}

一旦创建,观察者连接到接收端点,类似于消费者。 

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = ...;

    cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
    {
        e.Observer<CustomerAddressUpdatedObserver>();
    });
});

 

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
安全 物联网 5G
5g技术的优缺点是什么
5g技术的优缺点是什么
1185 0
|
安全 Linux
Linux内核OverlayFS子系统权限提升漏洞(CVE-2023-0386)
Linux内核OverlayFS子系统权限提升漏洞,在Linux内核的 OverlayFS子系统中,当用户将一个具有权限的文件从一个nosuid挂载点复制到另一个挂载点时,未经授权的攻击者可以执行setuid文件,导致权限提升。
341 1
|
人工智能 算法 PyTorch
TorchAcc:基于 TorchXLA 的分布式训练框架
阿里云研究员、阿里云人工智能平台 PAI 技术负责人--林伟在GTC 2024 大会 China AI Day 线上中文演讲专场上介绍了TorchAcc,这是一个基于 PyTorch/XLA 的大模型分布式训练框架。
|
网络安全 开发者 iOS开发
iOS技术博客:App备案指南
本文介绍了移动应用程序(App)备案的重要性和流程。备案是规范App开发和运营的必要手段,有助于保护用户权益、维护网络安全和社会秩序。为了帮助开发者更好地了解备案流程,本文提供了一份最新、最全、最详的备案指南,包括备案目的、好处、对象、时间、流程、条件和注意事项等内容。
iOS技术博客:App备案指南
|
机器学习/深度学习 计算机视觉
YOLOv8改进 | Conv篇 | 在线重参数化卷积OREPA助力二次创新(提高推理速度 + FPS)
YOLOv8改进 | Conv篇 | 在线重参数化卷积OREPA助力二次创新(提高推理速度 + FPS)
481 0
|
12月前
|
前端开发 JavaScript 定位技术
三、前端高德地图、测量两个点之前的距离
文章介绍了如何在前端使用高德地图API实现测量两个点之间的距离,包括开启和关闭测量工具的方法,以及如何清除地图上的测量点、连线和文字。
452 1
三、前端高德地图、测量两个点之前的距离
|
7月前
|
人工智能 小程序 数据挖掘
2025年企业CRM选型指南:销售易、金蝶、纷享销客对比
销售易、金蝶和纷享销客是国内知名的CRM解决方案,各自具备独特优势。销售易功能全面,涵盖销售、客户、营销管理及AI赋能,适合中大型企业;金蝶与ERP无缝集成,财务管理强大,适合传统企业;纷享销客连接能力强,用户体验佳,性价比高,适合中小企业。本文从功能、体验、价格、评价及适用场景对比三者,助力企业选择合适的CRM系统,推动数字化转型。
|
10月前
|
SQL druid 数据库
如何进行数据库连接池的参数优化?
数据库连接池参数优化包括:1) 确定合适的初始连接数,考虑数据库规模和应用需求;2) 调整最大连接数,依据并发量和资源状况;3) 设置最小空闲连接数,平衡资源利用和响应速度;4) 优化连接超时时间,确保系统响应和资源利用合理;5) 配置连接有效性检测,定期检查连接状态;6) 调整空闲连接回收时间,适应访问模式并配合数据库超时设置。
java中Stream流中的forEach、filter、map、count、limit、skip、concat
java中Stream流中的forEach、filter、map、count、limit、skip、concat
517 0
|
存储 关系型数据库 MySQL
深入OceanBase内部机制:高性能分布式(实时HTAP)关系数据库概述
深入OceanBase内部机制:高性能分布式(实时HTAP)关系数据库概述