.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 异常处理)--学习笔记

简介: 异常处理其他高级功能

2.6.8 RabbitMQ -- Masstransit 异常处理

  • 异常处理
  • 其他
  • 高级功能

异常处理

  • 异常与重试
  • 重试配置
  • 重试条件
  • 重新投递信息
  • 信箱

异常与重试

Exception

public class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public Task Consume(ConsumeContext<SubmitOrder> context)
    {
        throw new Exception("Very bad things happened");
    }
}

UseMessageRetry

var sessionFactory = CreateSessionFactory();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("rabbitmq://localhost/");
    cfg.ReceiveEndpoint("submit-order", e =>
    {
        e.UseMessageRetry(r => r.Immediate(5));
        e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
    });
});

重试配置

017.jpg

// 立即重试:一共连续重试10次
ep.UseMessageRetry(r => r.Immediate(10));
// 间隔重试:一共重试10次,每次间隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));
// 多个间隔重试:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));
// 指数级间隔重试:共10次,每次间隔:当前重试次数 * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));
// 每次叠加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));

重试条件

e.UseMessageRetry(r => 
{
    r.Handle<ArgumentNullException>();
    r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
    r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});

过滤某些异常类型不进行重试

重新投递信息

cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

消息冲队列移除之后,在一定时间之后重新投入消息队列。需要配置调度模块(scheduling)

信箱

cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.UseInMemoryOutbox();
    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

有些消息是在 consume 方法中发送或发布的,如果在发送之后 consume 中产生了异常,那原来发出去的消息就需要撤回,如果使用信箱之后,在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功之后才真正发出去

其他

  • Fault
  • Consuming Faults
  • Error Pipe
  • Dead-Letter Pipe

Fault

public interface Fault<T>
    where T : class
{
    Guid FaultId { get; }
    Guid? FaultedMessageId { get; }
    DateTime Timestamp { get; }
    ExceptionInfo[] Exceptions { get; }
    HostInfo Host { get; }
    T Message { get; }
}

Fault 消息在异常的时候会发布出来

Consuming Faults

public class DashboardFaultConsumer :
    IConsumer<Fault<SubmitOrder>>
{
    public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
    {
        // update the dashboard
    }
}

Fault 消息也是可以进行订阅的

Error Pipe

cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardFaultedMessages();
});

默认情况下错误的消息会被投递到了 _error 队列,可以配置直接抛弃错误信息

Dead-Letter Pipe

cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardSkippedMessages();
});

死信队列:没有消费者的消息会被移到 _skipped 队列,但可以配置为不移到 _skipped 队列

高级功能

  • 持久化
  • Saga 事件串
  • 调度
  • Courier 最终一致性
  • 监控
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
人工智能 缓存 Kubernetes
.NET 9 首个预览版发布:瞄准云原生和智能应用开发
.NET 9 首个预览版发布:瞄准云原生和智能应用开发
|
4月前
|
开发框架 缓存 Cloud Native
微软发布 .NET 云原生开发框架—— .NET Aspire
微软于 2023-11-14日 发布了 .NET 8 的正式版。伴随着这个重要 .NET 版本的发布,微软也发布了一个全新的 .NET云原生开发框架 —— .NET Aspire,它提供了如下 3 个方面的能力,来帮助我们使用 .NET 开发分层、云就绪的可观测、本地与生产环境一致的分布式云原生应用程序。
198 0
|
6月前
|
Cloud Native 架构师 Java
谷歌架构师分享gRPC与云原生应用开发Go和Java为例文档
随着微服务和云原生相关技术的发展,应用程序的架构模式已从传统的单体架构或分层架构转向了分布式的计算架构。尽管分布式架构本身有一定的开发成本和运维成本,但它所带来的收益是显而易见的。
|
1月前
|
机器学习/深度学习 算法 安全
隐私计算训练营第三讲-详解隐私计算的架构和技术要点
SecretFlow 是一个隐私保护的统一框架,用于数据分析和机器学习,支持MPC、HE、TEE等隐私计算技术。它提供设备抽象、计算图表示和基于图的ML/DL能力,适应数据水平、垂直和混合分割场景。产品层包括SecretPad(快速体验核心能力)和SecretNote(开发工具)。算法层涉及PSI、PIR、数据分析和联邦学习(水平、垂直、混合)。此外,SecretFlow还有YACL密码库和Kusica任务调度框架,Kusica提供轻量化部署、跨域通信和统一API接口。
41 0
|
1月前
|
消息中间件 存储 Cloud Native
深度剖析 RocketMQ 5.0,架构解析:云原生架构如何支撑多元化场景?
了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。
140031 1
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
7月前
|
消息中间件 缓存 NoSQL
|
8月前
|
SQL 分布式计算 架构师
极客时间架构师训练营 - week12 - 作业 2
极客时间架构师训练营 - week12 - 作业 2
73 0
|
8月前
|
SQL 运维 安全
极客时间架构师训练营 - week11 - 作业 2
极客时间架构师训练营 - week11 - 作业 2
28 0
|
8月前
|
算法 架构师 网络协议
极客时间架构师训练营 - week8 - 作业 2
极客时间架构师训练营 - week8 - 作业 2
50 0