在电商平台或者其他的平台上,经常会出现超时订单自动取消的场景,例如订单超过15分钟会自动取消订单,在用户注册成功 15 分钟后,发短信息通知用户等等,这些业务场景都是延时任务的场景,在电商,支付等系统中,一设都是先创建订单(支付单),再给用户一定的时间进行支付,如果没有按时支付的话,就需要把之前的订单(支付单)取消掉。这种类 以的场景有很多,还有比如到期自动收货,超时自动退款,下单后自动发送短信等 等都是类似的业务问题。
1.定时任务
通过定时任务可以实现一个低成本,易实现的延时方案。,写一个定时任务,定期扫描数据库中的订单,如果时间过期,就将其状态更新为已过期或者关闭即可。
@Scheduled(cron = "0 0 22 * * ?") public void Notify(){ this.Service.todoNotify(); }
@Scheduled注解部分
@Scheduled(cron = "0 0 22 * *?"):这是 Spring 框架中的一个定时任务注解。cron表达式用于定义任务执行的时间规则。在这个表达式0 0 22 * *?中:
第一个0表示秒,即每分钟的第 0 秒执行。
第二个0表示分,即每小时的第 0 分执行。
22表示小时,也就是每天的 22 点(晚上 10 点)执行。
后面的* *分别表示每月的任意天和每周的任意天。
?在cron表达式中用于表示不指定具体的星期几或者天,因为前面已经通过其他字段(如天或者星期几)指定了任务执行的时间规则,这里的?用于占位。
方法部分
public void Notify():这是一个定义的方法,方法名为Notify,没有返回值(void)。
this.Service.todoNotify();:在Notify方法内部,调用了当前类中的Service对象的todoNotify方法。推测Service可能是一个业务逻辑相关的服务类,而todoNotify方法可能是执行某种通知相关的业务操作。
整体来说,这段代码的作用是在每天的 22 点整执行Notify方法,在Notify方法中执行与通知相关的业务逻辑(通过调用Service对象的todoNotify方法实现)。
优点:简单易于实现,并且成本很低,不依赖于其他的组件
缺点:1.时间不太准确:由于定时任务扫描的时间间隔是固定的,所以可能造成一些订单已经过了过期时间,但是订单并没有关闭,因此订单的状态会有延迟。
2.增加了数据库的压力:随着订单的数量越来越多,扫描的范围也越来越大,执行时间也会变长,每次扫描数据库中订单,开销也是比较大,因此对数据库的压力也会增大,影响数据库的性能。
因此基于以上的优点和缺点,定时任务方案应对延迟场景的比较适合于对时间要求不是很严格,并且数据量不是很多的情况可以选择定时任务。
2.JDK 延迟队列 DelayQueue
DelayQueue是 JDK提供的一个无界队列,我们可以看到,DelayQueue队列中的元素需要实现 Delayed,它只提供了一个方法,就是获取过期时间。
用户的订单生成以后,设置过期时间放入定义好的Delay无界队列中,然后创建一个线程,在线程中通过循环(while(true))不断地从无界队列中获取过期的数据
优点:实现方便,无需依赖三方组件
缺点:DelayQueue是一个无界队列,如果放入的订单过多的话可能会出现OOM,并且DelayQueue是依赖于JVM的,若是该工程关闭,在DelayQueue中的数据就会丢失
因此该种方案适合于数据量少,并且数据量丢失也不会对系统的功能实现造成影响的业务场景。
3.RocketMQ 的延迟消息
RocketMQ 的延迟消息是指消息发送到 Broker 后,不会立即被消费者消费,而是在延迟一定时间后才对消费者可见并可被消费。
延迟消息的原理
1.RocketMQ 在内部将延迟消息暂存到内部特定的延迟队列中,这些延迟队列是基于不同的延迟级别来划分的。每个延迟级别对应一个特定的延迟时间,例如延迟级别 1 对应 1s,延迟级别 2 对应 5s 等。
2.当消息被设置为延迟消息时,Broker 会根据延迟时间将消息放入相应的延迟队列。在消息存储时,会根据延迟级别对消息的投递时间进行计算并保存相关信息。
3.随着时间的推移,Broker 内部有一个定时任务会不断地检查各个延迟队列中的消息,当消息的投递时间到达时,就会将消息从延迟队列中取出,重新存储到对应的目标主题的队列中,此时消息对消费者来说就变为可见,消费者可以正常消费该消息。
当一个订单创建好以后,设置过期时间,并将这条消息投递到rocketmq中,在延迟时间之后,消费者就会消费该条消息
优点:系统之间完全解耦,只需要关注消费者和生产者即可,并且rocketmq有万亿级的吞吐量,可以提高系统的响应性能
缺点:引入了消息队列后,随之而来的消息队列的数据持久性如何保证,如何保证消息的可靠型,幂等性处理等问题也会出现,增加了系统的复杂程度。
因此该种方案适合于追求高吞吐量的业务场景。
4.RabbitMQ 死信队列
RabbitMQ 中的死信队列(Dead Letter Queue)是一种特殊的消息队列机制,用于处理那些无法被正常消费的消息。
死信产生的原因
1.消息被拒绝(basic.reject 或 basic.nack)并且设置了不重新入队(requeue = false)。当消费者处理消息出现异常等情况时,可以选择拒绝消息,如果不希望消息重新回到原队列被再次消费,那么该消息就会成为死信。
2.消息过期。在 RabbitMQ 中可以给消息设置 TTL(Time To Live),即消息的存活时间,如果消息在队列中存活时间超过了设置的 TTL,就会变成死信。
3.队列达到最大长度。当队列的消息数量达到了其设置的最大长度(队列的 x-max-length 属性),新进来的消息会被丢弃,被丢弃的消息会成为死信。
死信队列的设置和使用
1.创建死信交换器(Dead Letter Exchange)和死信队列(Dead Letter Queue)。可以像创建普通的交换器和队列一样,在 RabbitMQ 中定义死信交换器和死信队列,并且将它们进行绑定。
2.将业务队列与死信交换器进行关联。在声明业务队列时,通过设置队列的相关参数(x-dead-letter-exchange 和 x-dead-letter-routing-key)来指定当该队列中的消息变为死信后,消息应该被路由到哪个死信交换器以及使用哪个路由键。
优点:同 RocketMQ 一样,RabbitMQ 同样可以使业务解耦,基于其集群的扩展性, 也可以实现高可用、高性能的目标。
缺点:死信队列本质还是一个队列,队列都是先进先出,如果队头的消息过期时间比较 长,就会导致后面过期的消息无法得到及时消费,造成消息阻塞。
5.redis 过期监听
redis 是一个高性能的 KV 数据库,除了用作缓存以外,其实还提供了过期监听的功能。 在 redis.conf 中,配置 notify-keyspace-events Ex 即可开启此功能。 然后在代码中继承 KeyspaceEventMessageListener,实现 onMessage 就可以监听 过期的数据量。部分源码如下:
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean { 2 3 private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*"); 4 5 //...省略部分代码 6 public void init() { 7 if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) { 8 RedisConnection connection = listenerContainer.getConnectionFactory().getConnection(); 9 try { 10 Properties config = connection.getConfig("notify-keyspace-events"); 11 if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) { 12 connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter); 13 } 14 } finally { 15 connection.close(); 16 } 17 } 18 doRegister(listenerContainer); 19 } 20 21 protected void doRegister(RedisMessageListenerContainer container) { 22 listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS); 23 } 24 //...省略部分代码 25 @Override 26 public void afterPropertiesSet() throws Exception { 27 init(); 28 } 29 } 30
跟踪源码可以发现,其本质也是注册一个 listener,利用 redis 的发布订 阅,当 key 过期时,发布过期消息(key)到 Channel :keyevent@*:expired 中。在实际的业务中,我们可以将订单的过期时间设置比如 30 分钟,然后放入到 redis。 30 分钟之后,就可以消费这个 key,然后做一些业务上的后置动作,比如检查用户是否支付。
优点: 由于 redis 的高性能,所以我们在设置 key,或者消费 key 时,速度上是可 以保证的。
缺点:由于 redis 的 key 过期策略原因,当一个 key 过期时,redis 无法保证立刻 将其删除,自然我们的监听事件也无法第一时间消费到这个 key,所以会存在一定的延迟。另外,在 redis5.0 之前,订阅发布中的消息并没有被持久化,自然也没有所谓的确认机制。所以一旦消费消息的过程中我们的客户端发生了宕机,这条消息就彻底丢失了。
6.Redisson 分布式延迟队列
Redisson 是一个基于 redis 实现的 Java 驻内存数据网格,它不仅提供了一系列的分 布式的 Java 常用对象,还提供了许多分布式服务。
基本原理
1.Redisson 的分布式延迟队列是基于 Redis 的有序集合(Sorted Set)和发布 / 订阅(Pub/Sub)机制来实现的。
2.当一个消息被添加到延迟队列时,Redisson 会将消息的到期时间戳和消息内容作为成员和分值添加到 Redis 的有序集合中。
3.Redisson 会在内部启动一个或多个任务调度线程,这些线程会定时检查有序集合中分值最小(即到期时间最早)的元素。如果当前时间大于等于该元素的分值(到期时间),则将该消息从有序集合中移除,并通过发布 / 订阅机制发布该消息。
4.消费者订阅相应的频道后,可以接收到发布的消息并进行处理。
Redisson 除了提供我们常用的分布式锁外,还提供了一个分布式延迟队列 RDelayedQueue,他是一种基于 zset 结构实现的延迟队列,其实现类是 RedissonDelayedQueue。
优点:使用简单,并且其实现类中大量使用 lua 脚本保证其原子性,不会有并发重复 问题。
缺点:需要依赖 redis(如果这算一种缺点的话)。
7.总结
关于延迟任务的业务场景在实际开发中应用的很广,因此在处理相关业务时要选择合适的处理方案,并针对可能出现的问题做针对性的方案预警才可保证的业务和数据的安全性。