四、分析几个回执方法
4.1、确认消息
channel.basicAck(long deliveryTag, boolean multiple);
我们一般使用下列方式:
channel.basicAck( message.getMessageProperties().getDeliveryTag(), false);
4.2、拒绝消息
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ;
我们接下来还是修改消费者二,将这个方法最后个参数更改为false,看现象是什么?
import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component @RabbitListener(queues="directQueueTx") public class Consumer2 { @RabbitHandler public void process(String msg,Channel channel, Message message) throws IOException { //拿到消息延迟消费 try { Thread.sleep(1000*3); } catch (InterruptedException e) { e.printStackTrace(); } try { if(!isNull(msg)){ String numstr = msg.substring(3); Integer num = Integer.parseInt(numstr); if(num >= 3){ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("get msg2 basicNack msg = "+msg); }else{ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("get msg2 basicAck msg = "+msg); } } } catch (Exception e) { //消费者处理出了问题,需要告诉队列信息消费失败 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.err.println("get msg2 failed msg = "+msg); } } public static boolean isNull(Object obj){ return obj == null || obj == ""||obj == "null"; } }
重启项目,重新请求测试接口。
发现,当出现设置参数为false时,也就是如下所示的设置时:
channel.basicNack( message.getMessageProperties().getDeliveryTag(), false, false);
如果此时消费者二出了问题,这条消息不会重新回归队列中重新发送,会丢失这条数据。
并且再消息队列中不会保存:
4.3、拒绝消息
channel.basicReject(long deliveryTag, boolean requeue);
这个和上面的channel.basicNack
又有什么不同呢?我们还是修改消费者二实验下。
请求测试接口,查看日志信息。
发现,此时的日志信息配置
channel.basicReject( message.getMessageProperties().getDeliveryTag(), true);
和
channel.basicNack( message.getMessageProperties().getDeliveryTag(), false, true);
实现的效果是一样的,都是将信息拒绝接收,由于设置的requeue为true,所以都会将拒绝的消息重新入队列中,重新进行消息分配并消费。
五、总结
这一篇博客,我们总结了相关的配置,三个确认(或回执)信息的方法,并区别了他们的各项属性,也知道了当消息再一个消费者中处理失败了,如何不丢失消息重新进行消息的分配消费问题。
但是这个只是队列和消费者之间的消息确认机制,使用手动ACK方式确保消息队列中的消息都能在消费者中成功消费。那么,消息转发器和消息队列之间呢?消息生产者和消息转发器之间呢?
当然,差点忘了一个小问题。
我们思考一个问题,如果消息队列对应的消费者只有一个,并且那个消费者炸了,会出现什么问题呢??