- 什么是消息回调
消息回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功) - 为什么要进行消息确认
经常会听到丢消息的字眼, 对于程序来说,发送者没法确认是否发送成功,消费者处理失败也无法反馈,
没有消息确认机制,就会出现消息莫名其妙的没了,也不知道什么情况 生产者推送消息[确认]
0.前提:使用直连交换机完成消息的发送和接收1.在rabbitmq-provider项目的application.yml文件上,添加消息确认的配置项
#1.开启 confirm 确认机制 spring.rabbitmq.publisher-confirms=true #2.开启 return 确认机制 spring.rabbitmq.publisher-returns=true #3.设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 spring.rabbitmq.template.mandatory=true 注1:设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 spring.rabbitmq.template.mandatory=true//此行配置与正面代码效果一样 rabbitTemplate.setMandatory(true);//此行代码也配置项3的效果一致
2.创建RabbitTemplateConfig,在里面创建自定义RabbitTemplate并添加2个相关的回调函数
RabbitTemplate.ConfirmCallback 通过实现ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中 还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true RabbitTemplate.ReturnCallback 通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调 还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true 见:资料/RabbitTemplateConfig.java
3.ConfirmCallback和ReturnCallback这2个回调函数在什么情况会触发
先从总体的情况分析,推送消息存在四种情况 A.消息推送到server,找不到交换机 B.消息推送到server,找到交换机了,但是没找到队列 C.消息推送到server,交换机和队列啥都没找到(C和A效果是一样,交换机都没找到,更不用说队列了) D.消息推送成功
4.小结:
流程:由生产者 -------> 交换机(Exchange) ConfirmCallBack: 如果消息没有到exchange,则confirm回调,ack=false 如果消息到达exchange,则confirm回调,ack=true 流程:由交换机(Exchange)--------> 队列(Queue) ReturnCallBack: exchange到queue成功,则不回调return exchange到queue失败,则回调return(需设置mandatory=true,否则不执行回调函数,消息就丢了)
5.注意:spring-rabbit和原生的rabbit-client ,表现是不一样的.测试的时候,原生的client,exchange错误的话,直接就报错了,
是不会到confirmListener和returnListener的
消费者接收消息[确认]
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式
1.自动确认这也是默认的消息确认情况。AcknowledgeMode.NONE,RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即 认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递 当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除
2.不确认(不介绍)
3.手动确认(多数选择的模式)
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功 basic.ack用于肯定确认 basic.nack用于否定确认 basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息 消费者端以上的3个方法都表示[消息已经被正确投递],但是basic.ack表示消息已经被正确处理, 但是basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息
## 消息接收手动确认编码开始
4.修改FirstQueueReceiver代码,继承AbstractAdaptableMessageListener并重写onMessage(Message message, Channel channel)方法1.消费者消息确认相关类和接口的体系结构 MessageListener ChannelAwareMessageListener//在此接口中,注入了通道Channel,Aware英文意思为“意识到、感知到、察觉到” AbstractAdaptableMessageListener public void onMessage(Message message, Channel channel) throws Exception ## 请将之前的handlerMessage方法注释掉,因为只需要一个处理方法 ## //@RabbitHandler ## //public void handlerMessage(Map<String, Object> map) {} 2.重写onMessage(Message message, Channel channel)方法,进行消息接收[确认] 关键代码: long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息唯一标识ID boolean multiple = false; channel.basicAck(deliveryTag, multiple); channel.basicReject(deliveryTag,true);//为true会重新放回队列 channel.basicNack(deliveryTag, multiple,true) 注1: basicAck(long deliveryTag, boolean multiple)方法的两个参数 deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
5.新建MessageListenerConfig,添加消息接收确认机制相关代码
源码见:资料/MessageListenerConfig 关键代码: container.setConnectionFactory(connectionFactory);//设置mq连接工厂对象 container.setConcurrentConsumers(1);//设置并发消费者 container.setMaxConcurrentConsumers(1);//设置最多的并发消费者 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 //注意:此处不能使用Autowired根据类型自动注入队列,必须调用rabbitmqDirectConfig.firstQueue()获得,why? // 因为项目中可能存在多个队列,它们的类型都是Queue,自动注入会报错 container.setQueues(rabbitmqDirectConfig.firstQueue()); container.setMessageListener(directReceiver); 注1:SimpleMessageListenerContainer即简单消息监听容器 这个类非常的强大,我们可以对他进行很多的设置,用对于消费者的配置项,这个类都可以满足 1.它可以设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。但是我们在实际生产中,很少使用事务,基本都是采用补偿机制 2.它可以设置消费者数量、最小最大数量、批量消费 3.它可以设置消息确认和自动确认模式、是否重回队列、异常捕获 Handler 函数 4.它可以设置消费者标签生成策略、是否独占模式、消费者属性等 5.它还可以设置具体的监听器、消息转换器等等 另外,SimpleMessageListenerContainer 还可以进行动态设置,比如在运行中的应用可以动态的修改 其消费者数量的大小、接收消息的模式等。 注2:消息确认模式 AcknowledgeMode.NONE:自动确认 AcknowledgeMode.AUTO:根据情况确认 AcknowledgeMode.MANUAL:手动确认
- Rabbitmq的消息确认的两种方式
Rabbitmq为了确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。