1.概述
要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange
插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。
2.安装RabbitMQ延迟插件
- 检查插件
使用rabbitmq-plugins list
命令用于查看RabbitMQ安装的插件。
rabbitmq-plugins list
检查RabbitMQ插件安装情况
- 下载插件
如果没有安装插件,则直接访问官网进行下载
https://www.rabbitmq.com/community-plugins.html
- 安装插件
下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:
E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins
打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.实现RabbitMQ消息队列延迟功能
- pom.xml配置信息文件中,添加相关依赖文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.olive</groupId> <artifactId>rabbitmq-spring-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.7</version> <relativePath /> </parent> <dependencies> <!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
- application.yml配置文件中配置RabbitMQ信息
server: port: 8080 spring: #给项目来个名字 application: name: rabbitmq-spring-demo #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin123 #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的 virtual-host: /
- RabbitMQ配置类
package com.olive.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * RabbitMQ配置类 **/ @Configuration public class RabbitMqConfig { public static final String DELAY_EXCHANGE_NAME = "delayed_exchange"; public static final String DELAY_QUEUE_NAME = "delay_queue_name"; public static final String DELAY_ROUTING_KEY = "delay_routing_key"; @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Queue queue() { Queue queue = new Queue(DELAY_QUEUE_NAME, true); return queue; } @Bean public Binding binding(Queue queue, CustomExchange delayExchange) { return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs(); } }
- 发送消息
实现消息发送,设置消息延迟5s。
package com.olive.service; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.olive.config.RabbitMqConfig; /** * 消息发送者 **/ @Service public class CustomMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送时间:" + sdf.format(new Date())); rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.DELAY_ROUTING_KEY, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 消息延迟5秒 message.getMessageProperties().setHeader("x-delay", 5000); return message; } }); } }
- 接收消息
package com.olive.service; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.olive.config.RabbitMqConfig; /** * 消息接收者 **/ @Component public class CustomMessageReceiver { @RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME) public void receive(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(sdf.format(new Date()) + msg); System.out.println("Receiver:执行取消订单"); } }
- 测试验证
package com.olive.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.olive.service.CustomMessageSender; @RestController public class DelayMessageController { @Autowired private CustomMessageSender customMessageSender; @GetMapping("/sendMessage") public String sendMessage() { // 发送消息 customMessageSender.sendMsg("你已经支付超时,取消订单通知!"); return "success"; } }
发送消息,访问
http://127.0.0.1:8080/sendMessage
查看控制台打印的信息