1、延迟队列
1.1、概念
- 延迟队列,队列内部是有序的。延迟队列中的元素是希望在指定时间到了之后或之前被取出和处理。简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
1.2、使用场景
- 订单在十分钟之内未支付则自动取消。
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行消息提醒。
- 用户发起退款如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个参会人员参加会议。
- 这些场景都有一个特点,需要在某个事件发生之后,或者之前的指定时间点完成某一项任务。
1.3、SpringBoot整合
1. 引入的依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.57</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
</dependency>
</dependencies>
2. 相关的配置
spring.rabbitmq.host=192.168.123.129
spring.rabbitmq.port=5672
spring.datasource.username=admin
spring.datasource.password=123
3. 添加Swagger配置类
package com.xiao.springbootrabbitmq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("rabbitmq接口文档")
.description("本文档描述了rabbitmq微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288","http://atguigu.com","6382472874@qq.com"))
.build();
}
}
1.4、队列TTL
1.4.1、代码架构图
- 创建两个队列
QA
和QB
,两者队列TTL分别设置为10s
和40s
,然后创建一个死信队列QD
。
1.4.2、TTL队列配置类
package com.xiao.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String Y_EXCHANGE = "Y";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String DEAD_LETTER_QUEUE_D = "QD";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",Y_EXCHANGE);
map.put("x-dead-letter-routing-key","YD");
map.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
}
@Bean("queueB")
public Queue queueB(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",Y_EXCHANGE);
map.put("x-dead-letter-routing-key","YD");
map.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(map).build();
}
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
}
@Bean
public Binding QueueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding QueueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding QueueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
1.4.3、生产者代码
package com.xiao.springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMessage(@PathVariable String message){
log.info("当前时间为:{},当前发送的消息为:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","XA","当前的消息来自ttl为10s的队列:" + message);
rabbitTemplate.convertAndSend("X","XB","当前的消息来自ttl为40s的队列:" + message);
}
}
1.4.4、消费者代码
package com.xiao.springbootrabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void ReceiveMessage(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
log.info("当前时间为:{},收到死信队列的消息为:{}",new Date().toString(),msg);
}
}
1.4.5、测试结果
1.4.6、不足之处
- 如果这样使用的话,那么每增加一个新的时间需求的话,就要新增一个队列。那么面对许多个需求就要新增许多个队列。
1.5、延迟队列优化
1.5.1、代码架构图
QC
不设置延迟时间,其延迟时间由生产者进行设置,那么我们所形成的就是一个通用队列了。
1.5.2、配置类上添加的代码
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_EXCHANGE);
}
@Bean("queueC")
public Queue queueC(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",Y_EXCHANGE);
map.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(map).build();
}
@Bean
public Binding QueueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
1.5.3、生产者上添加的代码
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
rabbitTemplate.convertAndSend("X","XC",message,correlationDate->{
correlationDate.getMessageProperties().setExpiration(ttlTime);
return correlationDate;
});
log.info("当前时间为:{},发送一条时长{}毫秒TTL的消息给队列C:{}",new Date().toString(),ttlTime,message);
}
1.5.4、测试结果
1.5.5、存在的问题
消息可能不会按时死亡,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢弃到死信队列中,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
1.6、Rabbitmq插件实现延迟队列
1.6.1、安装插件
- 下载地址
- 将插件复制到
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
- 执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 安装成功之后
1.6.2、代码架构图
1.6.3、配置类代码
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
1.6.4、生产者代码
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData ->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(),delayTime, message);
1.6.5、消费者代码
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
1.6.6、测试结果
- 过期时间短的优先进入延时队列。
1.7、总结
- 延时队列在需要延时处理的场景下非常有用,使用
RabbitMQ
来实现延时队列可以很好的利用RabbitMQ
的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ
集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。 - 当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用Redis 的 zset ,利用 Quartz或者利用kafka 的时间轮 ,这些方式各有特点,看需要适用的场景