Confirm确认消息
Confirm消息确认机制:了消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!
1.在 channel 上开启确认模式: channel . confirmselect ( )
2.在channel上添加监听: addConfirmListener ,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!
生成者:
public class Producer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); //4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.save"; //5 发送一条消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } }); } }
Return 消息机制
Return LiStener 用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个 Exchange 和Routingkey ,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener !
Return 消息机制在基础 Api 中有一个关键的配置项: Mandatory :如果为 true ,则监听器会接收到路由不可达的消息然后进行后续处理,如果为 false ,那么 broker 端自动删除该消息!
消费者:
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费者: " + msg); } } }
生产端:
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; String routingKey = "return.save"; String routingKeyError = "abc.save"; String msg = "Hello RabbitMQ Return Message"; channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //第三个参数为mandatory channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); } }
自定义消费者使用
继承DefaultConsumer重写handleDelivery方法。
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
channel.basicConsume(queueName, true, new MyConsumer(channel));
消费端限流
当我们的RabbitMQ上有成千上万未处理的消息,单个客户端没有办法处理这么多消息。
所有我们一般都不会去使用自动签收(autoAck:true)
RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global)
prefetchSize: 0 消息的大小限制设置为0表示不做限制。
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。(每次消费几条数据,消费完就发送ack,在传过来几条)
global: true\false 是否将上面设置应用于channel
简单点说,就是上面限制是channel级别的还是consumer级别
每次签收一条
//1 限流方式 第一件事就是 autoAck设置为 false channel.basicQos(0, 1, false);
自定义消费者ack
public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //1.消息标签 2.是否批量签收 channel.basicAck(envelope.getDeliveryTag(), false); } }
Dead-Letters-Queue “死信队列”
来自一个队列的消息可以被当做‘死信’,即被重新发布到另外一个“exchange”去,这样的情况有:
- 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false 参数
- 消息的TTL-存活时间已经过期
- 队列长度限制被超越(队列满)
Dead Letter Pattern
“死信”模式指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。
这个过程中的exchange和queue就是所谓的"Dead Letter Exchange 和 Queue"。
关键是如何区分“消费失败”和“处理失败”?消费失败需要送到死信队列,而处理失败不需要。大部分情况都属于“处理失败”。
生产端
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.save"; String msg = "Hello RabbitMQ DLX Message"; for(int i =0; i<1; i ++){ //设置过期时间 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } } }
消费端
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 这就是一个普通的交换机 和 队列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要进行死信队列的声明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); channel.queueBind("dlx.queue", "dlx.exchange", "#"); channel.basicConsume(queueName, true, new MyConsumer(channel)); } }