1.首先确保已经引入了Spring AMQP和RabbitMQ的相关依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 创建一个普通队列并设置TTL(消息过期时间),同时声明一个死信交换机和死信队列,当普通队列中的消息过期后会自动转发到死信队列:
@Bean Queue normalQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 60000); // 消息有效期为60秒 args.put("x-dead-letter-exchange", "delayExchange"); // 设置死信交换机 args.put("x-dead-letter-routing-key", "delayQueue"); // 设置死信路由键 return new Queue("normalQueue", true, false, false, args); } @Bean Queue delayQueue() { return new Queue("delayQueue", true, false, false); } @Bean DirectExchange delayExchange() { return new DirectExchange("delayExchange"); } @Bean Binding delayBinding(DirectExchange delayExchange, Queue delayQueue) { return BindingBuilder.bind(delayQueue).to(delayExchange).with("delayQueue"); }
3.在delayQueue上监听消息,这样当消息从normalQueue过期转移到delayQueue后,消费者就会接收到这条消息:
@RabbitListener(queues = "delayQueue") public void processDelayMessage(String message) { System.out.println("Processing delayed message: " + message); // 在这里处理延时后的消息 }
4.发送消息到normalQueue:
@Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend("normalQueue", message); }
以上代码示例中,正常的消息会被发送到normalQueue,如果在指定的TTL时间内未被消费,则该消息会作为死信转发到delayExchange,然后根据路由键路由到delayQueue,最终由监听delayQueue的消费者进行处理,从而实现了消息的延时处理。