1.springboot整合RabbitMQ
1.1springboot整合生产者
新建项目rabbitmqdemo02,新建模块producer-springboot
修改改模块的pom.xml,引入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
配置下application.yml
spring: rabbitmq: host: localhost username: guest password: guest virtual-host: / port: 5672
启动类com.wangzhou.ProducerApplication。
@SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } }
按如下结构新建配置类RabbitMQConfig
编写配置类。
@Configuration public class RabbitMQConfig { public static final String QUEUE_NAME = "boot_queue"; public static final String EXCHANGE_NAME = "boot_exchange"; @Bean("bootExchange") public Exchange bootExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } @Bean("bootQueue") public Queue bootQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue, @Qualifier("bootExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
编写测试类。
@SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest { // 1.注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSend() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.haha", "boot mq haha~~~~"); } }
运行,rabbitmq管控台就有我们创建的队列了。
点进去还可以看到具体的消息详情。
1.2 springboot整合消费者
步骤与生产者极其类似。
创建工程consumer-springboot
编写pom.xml。引入依赖。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
编写yml配置
spring: rabbitmq: host: localhost username: guest password: guest virtual-host: / port: 5672
新建主启动类。
@SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
新建监听类。
@Component public class RabbitMQListener { @RabbitListener(queues = "boot_queue") public void ListenQueue(Message message) { System.out.println(message); } }
启动类运行。完美!
小结下。
2.异常消息的浪浪山
2.1 消息可靠性问题
上面问题的答案是:发送时丢失(未到交换机或者到交换机未到队列),MQ丢失,消费者丢失。
针对这些可能性,我们将介绍如下高级特性。
基于这些问题,我们需要进一步学习MQ的一些高级特性。
2.2 生产者确认机制
2.2.1 初始化代码
新建一个工程,mq-advanced-demo。项目的架构如下图。
2.2.2 实现生产者确认
(1)生产者配置
完整代码。
logging: pattern: dateformat: HH:mm:ss:SSS level: cn.itcast: debug spring: rabbitmq: host: localhost # rabbitMQ的ip地址 port: 5672 # 端口 username: guest password: guest virtual-host: / publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
(2)实现ReturnCallback回调
当消息到交换器,但是路由过程中出现问题,通过ReturnCallback回调。
每个RabbitTemplate只能配置一个ReturnCallback,而RabbitTemplate是由spring容器创建的,是单例实例。因此ReturnCallback必须在全局进行配置,即在项目启动过程进行配置。
因此,CommonConfig实现了ApplicationContextAware接口。我们知道,Aware是通知接口,而ApplicationContext是一个bean容器,管理spring项目中的bean。因此,实现了ApplicationContextAware接口即意味着可以在项目启动所有bean(当然包括rabbitTemplate)加载以后调用回调,获取rabbitTemplate,设置全局的ReturnCallback。具体细节可以看接口的方法实现setApplicationContext。
代码如下。
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 记录日志 log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的话,重发消息 }); } }