1. 概述
mandatory和immediate是AMQP协议中basic.publish方法中的两个标识位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。对于刚开始接触RabbitMQ的朋友特别容易被这两个参数搞混,这里博主整理了写资料,简单讲解下这两个标识位。
mandatory
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
immediate
当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
2. mandatory
在生产者通过channle的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看channel接口,会发现存在3个重载的basicPublish方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
mandatory和immediate上面已经解释过了,其余的参数分别是:
exchange:交换机名称
routingkey:路由键
props:消息属性字段,比如消息头部信息等等
body:消息主体部分
本节主要讲述mandatory, 下面我们写一个demo,在RabbitMQ broker中有:
exchange : exchange.mandatory.test
queue: queue.mandatory.test
exchange路由到queue的routingkey是mandatory
这里先不讲当前的exchange绑定到queue中,即:
channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
详细代码如下:
package com.vms.test.zzh.rabbitmq.self;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Created by hidden on 2017/2/7.
*/
public class RBmandatoryTest {
public static final String ip = "10.198.197.73";
public static final int port = 5672;
public static final String username = "root";
public static final String password = "root";
public static final String queueName = "queue.mandatory.test";
public static final String exchangeName = "exchange.mandatory.test";
public static final String routingKey = "mandatory";
public static final Boolean mandatory = true;
public static final Boolean immediate = false;
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行,之后通过wireshark抓包工具可以看到如下图所示:
这里可以看到最后执行了basic.return方法,将发布者发出的消息返回给了发布者,查看协议的arguments参数部分可以看到:reply-text字段值为NO_ROUTE,表示消息并没有路由到合适的队列中;
那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为channel信道设置ReturnListener监听器来实现,具体代码(main函数部分):
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.return返回的结果是:"+message);
}
});
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
运行结果:
Basic.return返回的结果是:===mandatory===
下面我们来看一下,设置mandatory标志且exchange路由到queue中,代码部分只需要将:
channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
改为
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
即可。
通过wireshark抓包如下:
可以看到并不会有basic.return方法被调用。查看RabbitMQ管理界面发现消息已经到达了队列。
3. immediate
在RabbitMQ3.0以后的版本里,去掉了immediate参数的支持,发送带immediate标记的publish会返回如下错误:
“{amqp_error,not_implemented,”immediate=true”,’basic.publish’}”
为什么移除immediate标记,参见如下版本变化描述:
Removal of “immediate” flag
What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish.
Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.
What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0.
If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them.
这段解释的大概意思是:immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。