4.编写生产者
SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。
package com.zj; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testProducer(){ /* * 参数一:交换机名称 * 参数二:路由关键字 * 参数三:要发送的消息 */ rabbitTemplate.convertAndSend("boot_topic_exchange","message","hello MQ"); } }
5.编写消费者
我们编写另一个SpringBoot项目作为RabbitMQ的消费者,因为在同一个项目中的话直接方法调用就可以。
1、创建项目导入依赖。
2、编写配置文件,和生产者的相同
3、编写消费者,监听队列
@Component public class Consumer { // 监听队列 @RabbitListener(queues = "boot_queue") public void listen_message(String message){ System.out.println("发送短信:"+message); } }
4、运行项目。观察管控台队列和控制台
五、消息的可靠性投递
5.1 概念
RabbitMQ消息投递的路径为:
生产者
--->交换机
--->队列
--->消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?
- 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
- 退回模式(return)可以监听消息是否从交换机成功传递到队列。
- 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。
首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / #日志格式 logging: pattern: console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
在生产者的配置类创建交换机和队列:
package com.zj.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { private final String EXCHANGE_NAME = "boot_topic_exchange"; private final String QUEUE_NAME = "boot_queue"; // 创建交换机 @Bean(EXCHANGE_NAME) public Exchange getExchange() { return ExchangeBuilder .topicExchange(EXCHANGE_NAME) // 交换机类型和名称 .durable(true) // 是否持久化 .build(); } // 创建队列 @Bean(QUEUE_NAME) public Queue getMessageQueue() { return new Queue(QUEUE_NAME); // 队列名 } // 交换机绑定队列 @Bean public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("#.message.#") .noargs(); } }
创建生产者
@Component public class Consumer { // 监听队列 @RabbitListener(queues = "boot_queue") public void listen_message(String message){ System.out.println("发送短信:"+message); } }
5.2 确认模式
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:
1、生产者配置文件开启确认模式
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / # 开启确认模式 publisher-confirm-type: correlated
2、生产者定义确认模式的回调方法,并模拟向不存在的交换机aaa发送消息。
package com.zj; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testProducer(){ //定义确认模式的回调方法,当消息向交换机发送后会调用confirm方法。 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 相关配置信息 * @param ack 交换机是否收到消息 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息接受成功"); }else { System.out.println("消息接受失败:"+cause); //做一些处理让消息再次发送 } } }); rabbitTemplate.convertAndSend("aaa","message","hello MQ"); } }
3、运行结果
消息接受失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aaa' in vhost '/', class-id=60, method-id=40)
5.3 退回模式
退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:
1、生产者配置文件开启退回模式
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / # 开启确认模式 publisher-confirm-type: correlated # 开启回退模式 publisher-returns: true
2、生产者定义退回模式的回调方法,模拟向不存在的队列bbb发送消息。
@SpringBootTest class DemoApplicationTests { @Resource public RabbitTemplate rabbitTemplate; @Test public void testProducer(){ //定义退回模式的回调方法,只有交换机将消息发送到队列失败后才会执行该方法。 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { /** * * @param returnedMessage 失败后将失败信息封装到该参数 */ @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("消息对象:"+returnedMessage); System.out.println("错误码:"+returnedMessage.getReplyCode()); System.out.println("错误信息:"+returnedMessage.getReplyText()); System.out.println("交换机:"+returnedMessage.getExchange()); System.out.println("路由键:"+returnedMessage.getRoutingKey()); //处理消息…… } }); rabbitTemplate.convertAndSend("boot_topic_exchange","bbb","hello MQ"); } }
消息对象:ReturnedMessage [message=(Body:'hello MQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=boot_topic_exchange, routingKey=bbb] 错误码:312 错误信息:NO_ROUTE 交换机:boot_topic_exchange 路由键:bbb
5.4 Ack手动签收
在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。
消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。
- 自动确认:spring.rabbitmq.listener.simple.acknowledge="none"
- 手动确认:spring.rabbitmq.listener.simple.acknowledge="manual"
1.消费者配置开启手动签收
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / # 开启手动签收 listener: simple: acknowledge-mode: manual
2、消费者处理消息时定义手动签收和拒绝签收的情况
package com.zj.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class ACKConsumer { // 监听队列 /** * * @param message 消息对象 * @param channel 信道对象,用于手动接受消息 */ @RabbitListener(queues = "boot_queue") public void listen_message(Message message, Channel channel) throws IOException { //deliveryTag:消息投递序号,每次投递该值都会+1. long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //签收消息 /* * 参数一:消息投递序号 * 参数二:一次是否可以签收多条消息 */ channel.basicAck(deliveryTag,true); }catch (Exception e){ //拒签消息 /* * 参数一:消息投递序号 * 参数二:一次是否可以签收多条消息 * 参数三:拒签后消息是否重回队列(处在队列中的消息会不断的再向消费者发送消息) */ channel.basicNack(deliveryTag,true,true); System.out.println("消息消费失败"); } } }