一文带大家快速掌握RabbitMQ!(二)https://developer.aliyun.com/article/1624434
Return消息机制
用于处理一些不可路由的消息。
基础API
有一个关键配置项:Mandatory
:true,则监听器会接收到路由不可达的消息,然后进行处理;false,Broker会自动删除该消息
默认为false,当我们使用Return 消息机制的时候,我们需要将它设置为true
消息的生产者通过制定Exchange和RoutingKey
,把消息投递到某一个队列中,消费者监听队列,进行消费。
但在一些情况下,发送消息时,Exchange不存在或RoutingKey
路由不到,Return Listener就会监听这种不可达的消息,然后进行处理。
Return Listener代码
public class ReturnProducer { private static final String EXCHANGE_NAME = "return_exchange"; private static final String ROUTING_KEY = "return.key"; private static final String ROUTING_KEY_ERROR = "wrong.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.58.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 消息 String msg = "Send message of return demo"; // 添加并设置Return监听器 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("============ handleReturn ============"); System.err.println("replyCode —— " + replyCode); System.err.println("replyText —— " + replyText); System.err.println("exchange —— " + exchange); System.err.println("routingKey —— " + routingKey); System.err.println("properties —— " + properties); System.err.println("body —— " + new String(body)); } }); // 设置Mandatory为true, 可以进行后续处理, 不会删除消息。 // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, msg.getBytes()); // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, true, null, msg.getBytes()); } }
消费端自定义监听
自定义监听的原因:
- 我们一般就是在代码中编写while循环,进行
consumer.nextDelivery
方法进行获取下一条消息,然后进行消费处理! - 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!
- 非常简单,消费者只需要继承DefaultConsumer类,然后重写handleDelivery方法即可;
继承DefaultConsumer的此类被写出后,需要进行绑定。(在交换机绑定时绑定自定义的Consumer);
public class ReturnConsumer { private static final String EXCHANGE_NAME = "return_exchange"; private static final String ROUTING_KEY = "return.#"; private static final String QUEUE_NAME = "return_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.58.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机与队列, 指定路由键 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Receive Message —— " + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
消费端限流
当巨量消息瞬间全部推送时,单个客户端无法同时处理这些数据,服务器容易故障。因此要进行消费端限流
RabbitMQ提供了一种Qos(服务质量保证)功能,即在非自动确认前提下,如果一定数目的消息未被确认前(通过consume或者channel设置Qos值),不进行消费新消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize:消息限制大小,一般为0,不做限制。
prefetchCount:一次处理消息的个数,一般设置为1
global:一般为false。true,在channel级别做限制;false,在consumer级别做限制
public class QosConsumer { private static final String EXCHANGE_NAME = "qos_exchange"; private static final String ROUTING_KEY = "qos.#"; private static final String QUEUE_NAME = "qos_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.58.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机与队列, 指定路由键 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Receive Message —— " + new String(body)); // 手动ack签收 channel.basicAck(envelope.getDeliveryTag(), false); // 不批量签收 } }; /** * prefetchSize: 0 不限制消息大小 * prefetchCount: 一次处理消息的个数, ack后继续推送 * global: false 应用在consumer级别 */ channel.basicQos(0, 1, false); //限流:autoAck需设置为false, 关闭自动签收 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
限流需要设置channel.basicQos(0, 1, false);
关闭autoAck,且需要手动签收。
在重写的handleDelivery方法中,如果没有进行手动签收channel.basicAck()
,那么消费端在接收消息时,因为prefetchCount设置为1,只会接收1条消息,剩下的消息的等待中,并不会被推送,直到手动ack后。
队列
消费端ACK与重回队列机制
消费端的手工ACK和NACK:
消费端进行消费时,可能由于业务异常,会调用NACK拒绝确认,而到了一定次数,就直接ACK,将异常消息进行日志的记录,然后进行补偿。
由于服务器宕机等严重问题,消费端没消费成功,重发消息后,需要手工ACK保障消费端消费成功。
消费端的重回队列:
将没有处理成功的消息重新回递给Broker。
一般在实际应用中,会关闭重回队列。
TTL队列
TTL:Time To Live,生存时间。
可以指定消息的过期时间。
可以指定队列的过期时间,从消息入队列开始计算,超过了队列的超时时间设置,那么消息会自动清除
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .expiration("10000") .build();
死信队列
DLX:Dead-Letter-Exchange
当消息在队列中变成死信时,能被重新publish到另一个Exchange,该Exchange就是DLX
发生死信队列的情况:
- 消息被拒绝(
basic.reject/ basic.nack
)并且requeue=false
(没有重回队列) - 消息TTL过期
- 队列达到最大长度
死信队列的设置:
- 正常声明交换机,队列并绑定,需要在队列上设置一个参数:
arguments.put("x-dead-letter-exchange", "dlx.exchange");
- 声明死信队列的Exchange和Queue,然后进行绑定:
Exchange: dlx.exchange``Queue: dlx.queue``RoutingKey: #
- 在消息过期、requeue、队列达到最大长度时(即为死信),消息会发送到指定的
dlx.exchange
交换机上,消费者会监听该交换机所绑定的死信队列。
public class DlxConsumer { private static final String EXCHANGE_NAME = "dlx_exchange"; private static final String ROUTING_KEY = "dlx.#"; private static final String QUEUE_NAME = "dlx_queue"; // DLX private static final String DLX_EXCHANGE = "dlx.exchange"; private static final String DLX_QUEUE = "dlx.queue"; private static final String DLX_ROUTING_KEY = "#"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.58.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); // 1. 设置死信队列的参数 Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 2. 声明死信队列 channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null); channel.queueDeclare(DLX_QUEUE, true, false, false, null); channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Receive Message —— " + new String(body)); // 手动ack签收 channel.basicAck(envelope.getDeliveryTag(), false); // false 不批量签收 } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
最后
文章内容收录到个人网站,方便阅读:hardyfish.top/
参考和鸣谢:
视频:RabbitMQ消息中间件技术精讲