RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

简介: RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息ExchangeExchange根据路由键消息路由到合适的QueueQueue再将消息推(或消费者主动拉)给消费者

在这个过程当中,Exchange根据路由键将消息路由到合适的Queue的过程,可能发生诸如


  1. Exchange没有任何Queue与其绑定,
  2. 或者根据消息的路由键,没有任何一个合适的Queue来投递消息,

从而导致消息路由失败。

对于这些路由失败的消息应该如何处理呢?

有两种方式:

  1. 将消息返回给投递该条消息的生产者。
  2. 使用备份交换机 alternate-exchange(AE)。


方式1:将消息返回给投递该条消息的生产者


  • 配置

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing
# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=true
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true
  • 交换机定义与消息发送

@Slf4j
@Component
public class NoMatchQueue {
    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void send() {
        log.info("发送消息");
        Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
        Message message = MessageBuilder
                .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
}
@Configuration
class ExchangeDeclare {
    /**
     * 只定义一个交换机,但是不绑定任何Queue,所以发送到该Exchange的消息都会路由失败
     *
     * @return
     */
    @Bean
    public Exchange noMatchQueueExchange() {
        return ExchangeBuilder
                .topicExchange(NoMatchQueue.EXCHANGE_NAME)
                .durable(true)
                .build();
    }
}
  • 设置回调函数

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息被退回:{}", returnedMessage);
    }
});


  • 消息被退回:且可以看到原因是无法路由


image.png


方式2:使用备份交换机



使用方式1需要我们在程序中进行编码设置回调函数监听,增加了生产者代码的复杂性,那么为了消息不丢失还有没有其他方式来处理路由失败的消息呢: 答案是使用备份交换机

  • 相较于使用回调函数,使用备份交换机只需要给交换机绑定一个备份交换机即可,当消息路由失败之后,消息将投递到备份交换机,再由备份交换机路由消息到备份队列。这样我们只需要关注这个备份队列就能知道/获取到路由失败的消息。通常情况下备份交换的Type应该设置为fanout
  • 配置

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing
# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=false
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=false


  • 注意: 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。
  • 正常业务交换机(不绑定队列,使得消息一定会路由失败)

/**
 * 业务交换机
 *
 * @return
 */
@Bean
public Exchange noMatchQueueExchange() {
    return ExchangeBuilder
            .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
            .durable(true)
            // 绑定备份交换机
            .alternate(X_ALTERNATE)
            .build();
}


  • 备份交换机/队列/绑定

/**
 * 备份队列
 *
 * @return
 */
@Bean
public Queue alternateQueue() {
    return QueueBuilder
            .durable("Q_ALTERNATE")
            .build();
}
/**
 * 备份交换机
 *
 * @return
 */
@Bean
public Exchange alternateExchange() {
    return ExchangeBuilder
            .fanoutExchange(X_ALTERNATE)
            .durable(true)
            .build();
}
/**
 * 备份绑定
 *
 * @param alternateExchange
 * @param alternateQueue
 * @return
 */
@Bean
public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) {
    return BindingBuilder
            .bind(alternateQueue)
            .to(alternateExchange)
            .with("")
            .noargs();
}


  • 消息投递

/**
 * 正常业务交换机
 */
public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE";
@Autowired
private RabbitTemplate rabbitTemplate;
/**
 * 发送消息
 */
@PostConstruct
public void send() {
    log.info("发送消息");
    Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
    Message message = MessageBuilder
            .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .setContentEncoding(StandardCharsets.UTF_8.displayName())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}


  • 结果是消息被路由到备份交换机的备份队列


image.png


  • 且:如果你同时使用了两种方式,即(mandatory为true+Listener监听)和(备份交换机AlternateExchange),消息将只会路由到备份交换机,不会Return回生产者。
  • 在原生RabbitMQ-client中演示这一过程:

@Slf4j
public class AeTest {
    /**
     * 获取Channel
     */
    private static final Channel CHANNEL = MqChannelUtils.getChannel();
    /**
     * 备份交换机
     */
    private static final String X_AE = "X_AE";
    /**
     * 备份交换机绑定的队列
     */
    private static final String Q_AE = "Q_AE";
    /**
     * 正常业务的交换机
     */
    private static final String X_1 = "X_1";
    public static void main(String[] args) throws IOException {
        // 定义备份交换机-其实也是一个正常的交换机
        CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true);
        // 定义备份队列
        CHANNEL.queueDeclare(Q_AE, true, false, false, null);
        // 绑定备份
        CHANNEL.queueBind(Q_AE, X_AE, "");
        HashMap<String, Object> arguments = new HashMap<>();
        // 绑定的备份交换机
        arguments.put("alternate-exchange", X_AE);
        // 定义交换机
        CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments);
        // 添加监听器,看看是否还会return消息
        CHANNEL.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return returnMessage) {
                log.error("消息被退回{}", returnMessage);
            }
        });
        // 尝试向交换机发送消息(无法路由)- mandatory参数无效
        CHANNEL.basicPublish(X_1, "", false, false,
                new AMQP.BasicProperties(), "阿依古丽".getBytes(StandardCharsets.UTF_8));
    }
}


  • 两个交换机,正常的交换机X_1和备份交换机X_AE


image.png

image.png

image.png


image.png


备份交换机绑定的队列已经接收到了路由失败的消息


image.png


  • 其他要注意的点:
  • 备份交换机的Type设置为fanout比较合适,这样可以忽略RoutingKey,避免备份交换机又路由失败。
  • 被投递到备份交换机的RoutingKey为消息投递到MQ时的原始RoutingKey,不会变,这一点在其他场景下也是一样的。
  • 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。


# 源代码

https://gitee.com/FutaoSmile/tech-sharing-mq

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
7月前
|
消息中间件 存储 Kafka
RocketMQ实战—4.消息零丢失的方案
本文分析了用户支付完成后未收到红包的问题,深入探讨了RocketMQ事务消息机制的实现原理及其在确保消息零丢失中的作用。首先,通过全链路分析发现消息可能在推送、存储或消费环节丢失。接着,介绍了RocketMQ事务消息机制如何通过half消息、本地事务执行及回调确认来保证消息发送成功,并详细解析了其底层原理,如half消息对消费者不可见、rollback与commit操作等。同时,对比了同步重试方案,指出其在复杂场景下的局限性。
RocketMQ实战—4.消息零丢失的方案
|
11月前
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
2381 2
|
11月前
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
1610 0
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版方案评测
本文评估了阿里云《高弹性,低成本,云消息队列 RabbitMQ 实践》方案,从实践原理理解、部署体验、方案优势展现及业务场景匹配四个方面进行了深入分析。文中指出,该方案在解决消息积压、提高系统稳定性、支持弹性伸缩等方面表现优异,但也提出了在组件功能解释、实战案例提供等方面的改进建议,以期帮助用户更好地理解和应用该技术解决方案。
419 2
|
网络协议 网络虚拟化 网络架构
【网络实验】/主机/路由器/交换机/网关/路由协议/RIP+OSPF/DHCP(上)
【网络实验】/主机/路由器/交换机/网关/路由协议/RIP+OSPF/DHCP(上)
235 1
|
网络安全 数据安全/隐私保护 网络虚拟化
|
安全 网络安全 网络虚拟化
Cisco-三层交换机实现VLAN间路由
Cisco-三层交换机实现VLAN间路由
286 0
|
网络协议 数据安全/隐私保护 网络虚拟化
【网络实验】/主机/路由器/交换机/网关/路由协议/RIP+OSPF/DHCP(下)
【网络实验】/主机/路由器/交换机/网关/路由协议/RIP+OSPF/DHCP(下)
231 0