RabbitMQ之mandatory和immediate

简介: 1. 概述mandatory和immediate是AMQP协议中basic.publish方法中的两个标识位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。对于刚开始接触RabbitMQ的朋友特别容易被这两个参数搞混,这里博主整理了写资料,简单讲解下这两个标识位。

1. 概述

mandatory和immediate是AMQP协议中basic.publish方法中的两个标识位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。对于刚开始接触RabbitMQ的朋友特别容易被这两个参数搞混,这里博主整理了写资料,简单讲解下这两个标识位。

mandatory
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

immediate
当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。


2. mandatory

在生产者通过channle的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看channel接口,会发现存在3个重载的basicPublish方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

mandatory和immediate上面已经解释过了,其余的参数分别是:
exchange:交换机名称
routingkey:路由键
props:消息属性字段,比如消息头部信息等等
body:消息主体部分

本节主要讲述mandatory, 下面我们写一个demo,在RabbitMQ broker中有:
exchange : exchange.mandatory.test
queue: queue.mandatory.test
exchange路由到queue的routingkey是mandatory
这里先不讲当前的exchange绑定到queue中,即:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

详细代码如下:

package com.vms.test.zzh.rabbitmq.self;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Created by hidden on 2017/2/7.
 */
public class RBmandatoryTest {
    public static final String ip = "10.198.197.73";
    public static final int port = 5672;
    public static final String username = "root";
    public static final String password = "root";

    public static final String queueName = "queue.mandatory.test";
    public static final String exchangeName = "exchange.mandatory.test";
    public static final String routingKey = "mandatory";
    public static final Boolean mandatory = true;
    public static final Boolean immediate = false;

    public static void main(String[] args) {

        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(ip);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.basicQos(1);
            channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
//            channel.close();
//            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

运行,之后通过wireshark抓包工具可以看到如下图所示:
这里写图片描述

这里可以看到最后执行了basic.return方法,将发布者发出的消息返回给了发布者,查看协议的arguments参数部分可以看到:reply-text字段值为NO_ROUTE,表示消息并没有路由到合适的队列中;

那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为channel信道设置ReturnListener监听器来实现,具体代码(main函数部分):

try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(ip);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.basicQos(1);
            channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("Basic.return返回的结果是:"+message);
                }
            });

//            channel.close();
//            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

运行结果:

Basic.return返回的结果是:===mandatory===

下面我们来看一下,设置mandatory标志且exchange路由到queue中,代码部分只需要将:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

改为

channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

即可。
通过wireshark抓包如下:
这里写图片描述
可以看到并不会有basic.return方法被调用。查看RabbitMQ管理界面发现消息已经到达了队列。


3. immediate

在RabbitMQ3.0以后的版本里,去掉了immediate参数的支持,发送带immediate标记的publish会返回如下错误:
“{amqp_error,not_implemented,”immediate=true”,’basic.publish’}”

为什么移除immediate标记,参见如下版本变化描述:
Removal of “immediate” flag
What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish.
Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.
What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0.
If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them.
这段解释的大概意思是:immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。


参考资料

  1. 深入学习RabbitMQ(一):mandatory标志的作用
  2. RabbitMQ(二)AMQP协议mandatory和immediate标志位区别
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件
RabbitMQ之mandatory和immediate
mandatory和immediate是AMQP协议中basic.publish方法中的两个标识位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。对于刚开始接触RabbitMQ的朋友特别容易被这两个参数搞混,这里博主整理了写资料,简单讲解下这两个标识位。
416 0
|
8月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
6月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
413 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
983 93
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
466 92
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
239 1
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
330 16

热门文章

最新文章