SpringBoot应用篇基于Redis实现延时队列

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 延时队列,相信各位小伙伴并不会陌生,jdk原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢举一个简单的例子,如下单15分钟内,若没有支付,则自动取消订单本文将介绍一种非常非常简单的实现方式

image.png


延时队列,相信各位小伙伴并不会陌生,jdk原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢

举一个简单的例子,如下单15分钟内,若没有支付,则自动取消订单


本文将介绍一种非常非常简单的实现方式


I. 方案设计



要实现15分钟后自动取消订单,这个也太简单了,来给出一段神级代码


new Thread(() -> {
  // 休眠十五分钟,执行取消订单
  Thread.sleep(15 * 60 * 1000);
  cancelOrder(); 
}).start();
复制代码


好的,本文就此结束(开玩笑....)


忽略上面的段子,接下来想一想,如果让我们来实现一个延时队列,可以怎么整?

  • 单机:
  • DelayQueue
  • 定时任务
  • 分布式:
  • Quartz定时任务
  • rabbitmq延时队列
  • redis zset
  • redis 过期回调
  • 时间轮


首先我们这里排除掉单机版,至于原因,现在单体单实例应用实在不多见了,直接来看多实例的情况吧


在上面的几种方案中,重心放在redis上,两种case,下面分别介绍一下


1. redis过期时间


我们知道,在使用redis做缓存时,可以设置失效时间,借助redis的失效事件,我们可以来实现延时队列的场景


比如,现在一个订单,我们在redis中新加入一个订单id,失效时间设置为15分钟;当支付成功之后,主动删除这个缓存;若一直没有付钱,则15分钟后,触发一个过期事件,然后订阅这个事件,来执行取消订单


上面这种实现,有两个问题


  • key失效监听,可能存在大量的无效信息
  • 广播方式消费事件,多实例接收到这个事件,怎么防并发?或者没有一个实例接收到这个事件,那么这个取消订单就会漏掉


显然上面的第二点,漏消息是不能接受的


2. redis zset


zset属于redis提供的几个基本数据结构中的一种,它的特点是有 value + score

如果我们想使用zset拉实现演示队列,那么一个可选的方案就是将score设置为触发的时间戳,value为业务值


然后写一个定时任务,不断的从zset中,取出score小于当前时间戳的数据,任务它们都是已经到期可以执行的


借助这个方案,可以相对简单的实现一个演示队列了


II. redis演示队列实现



1. 环境配置


接下来我们将以redis的zset来实现延时队列,本文借助SpringBoot来搭建一个演示工程,使用的基本配置如下


本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA进行开发

核心依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 下面这里两个非必须,主要是后面的实现演示使用到了 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
</dependency>
复制代码


redis使用默认的配置,本机 localhost + 6379


2. 核心实现


借助redis zset来实现延时队列,具体的实现代码就很简单了,无非是从zset中取出score小于当前时间戳的数据


private static final Long DELETE_SUCCESS = 1L;
@Autowired
private StringRedisTemplate redisTemplate;
public String fetchOne(String key) {
    Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);
    if (CollectionUtils.isEmpty(sets)) {
        return null;
    }
    for (String val : sets) {
        if (DELETE_SUCCESS.equals(redisTemplate.opsForZSet().remove(key, val))) {
            // 删除成功,表示抢占到
            return val;
        }
    }
    return null;
}
复制代码


注意上面的实现,有一个点需要说一下


zset:每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息


为什么这样设计?


这里有两个点,先解释第一个,为啥先查后删


如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有我一个人霸占了她呢(忽然进入言情的世界😓)


借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;


因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登


接下来再看第二个,为啥一次拿三个


从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了


3. 写入队列


上面是从队列中拿数据,有拿当然得有写,所以我们简单的封装一下写入队列的case


@Component
public class RedisDelayListWrapper implements ApplicationContextAware {
    private static final Long DELETE_SUCCESS = 1L;
    private Set<String> topic = new CopyOnWriteArraySet<>();
    public void publish(String key, Object val, long delayTime) {
        topic.add(key);
        String strVal = val instanceof String ? (String) val : JSONObject.toJSONString(val);
        redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime);
    }
}
复制代码


4. 定时取演示队列消息


接下来就是一个定时任务,不断的调用上面的实现,从zset中获取到期的数据


@Scheduled(fixedRate = 10_000)
public void schedule() {
    for (String specialTopic : topic) {
        String cell = fetchOne(specialTopic);
        if (cell != null) {
            applicationContext.publishEvent(new DelayMsg(this, cell, specialTopic));
        }
    }
}
@ToString
public static class DelayMsg extends ApplicationEvent {
    @Getter
    private String msg;
    @Getter
    private String topic;
    public DelayMsg(Object source, String msg, String topic) {
        super(source);
        this.msg = msg;
        this.topic = topic;
    }
}
复制代码


上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦


5. 消息消费


最终就是我们的消息消费逻辑了,主要就是消费前面抛出的DelayMsg,我们这里借助AOP来实现消息过滤


定义一个注解Consumer,用来指定消费哪个topic


@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {
    String topic();
}
复制代码


注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件


aop拦截逻辑,根据topic进行过滤

@Aspect
@Component
public class ConsumerAspect {
    @Around("@annotation(consumer)")
    public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {
        Object[] args = joinPoint.getArgs();
        boolean check = false;
        for (Object obj : args) {
            if (obj instanceof RedisDelayListWrapper.DelayMsg) {
                check = consumer.topic().equals(((RedisDelayListWrapper.DelayMsg) obj).getTopic());
            }
        }
        if (!check) {
            // 不满足条件,直接忽略
            return null;
        }
        // topic匹配成功,执行
        return joinPoint.proceed();
    }
}
复制代码


5. 测试demo


最后写一个测试demo,验证下上面的实现

@EnableScheduling
@RestController
@SpringBootApplication
public class Application {
    private static final String TEST_DELAY_QUEUE = "test";
    private static final String DEMO_DELAY_QUEUE = "demo";
    @Autowired
    private RedisDelayListWrapper redisDelayListWrapper;
    private Random random = new Random();
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
    @GetMapping(path = "publish")
    public String publish(String msg, Long delayTime) {
        if (delayTime == null) {
            delayTime = 10_000L;
        }
        String queue = random.nextBoolean() ? TEST_DELAY_QUEUE : DEMO_DELAY_QUEUE;
        msg = queue + "#" + msg + "#" + (System.currentTimeMillis() + delayTime);
        redisDelayListWrapper.publish(queue, msg, delayTime);
        System.out.println("延时: " + delayTime + "ms后消费: " + msg + " now:" + LocalDateTime.now());
        return "success!";
    }
    @Consumer(topic = TEST_DELAY_QUEUE)
    public void consumer(RedisDelayListWrapper.DelayMsg delayMsg) {
        System.out.println("TEST消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
    }
    @Consumer(topic = DEMO_DELAY_QUEUE)
    public void consumerDemo(RedisDelayListWrapper.DelayMsg delayMsg) {
        System.out.println("DEMO消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
    }
}
复制代码


image.png


6. 小结


本文属于一个实战小技巧,借助redis的zset来灵活的实现一个简单的延时队列,实现倒是没有太大的难度,其中的一些小细节还是挺有意思的,好的,今天分享到此over,欢迎各位老铁来撩,公众号 一灰灰blog 你值得拥有



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
24天前
|
NoSQL 安全 测试技术
Redis游戏积分排行榜项目中通义灵码的应用实战
Redis游戏积分排行榜项目中通义灵码的应用实战
43 4
|
25天前
|
监控 NoSQL 网络协议
【Azure Redis】部署在AKS中的应用,连接Redis高频率出现timeout问题
查看Redis状态,没有任何异常,服务没有更新,Service Load, CPU, Memory, Connect等指标均正常。在排除Redis端问题后,转向了AKS中。 开始调查AKS的网络状态。最终发现每次Redis客户端出现超时问题时,几乎都对应了AKS NAT Gateway的更新事件,而Redis服务端没有任何异常。因此,超时问题很可能是由于NAT Gateway更新事件导致TCP连接被重置。
|
26天前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
37 5
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
45 6
|
2月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
64 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
缓存 NoSQL Java
springboot的缓存和redis缓存,入门级别教程
本文介绍了Spring Boot中的缓存机制,包括使用默认的JVM缓存和集成Redis缓存,以及如何配置和使用缓存来提高应用程序性能。
124 1
springboot的缓存和redis缓存,入门级别教程
|
2月前
|
JSON NoSQL Java
springBoot:jwt&redis&文件操作&常见请求错误代码&参数注解 (九)
该文档涵盖JWT(JSON Web Token)的组成、依赖、工具类创建及拦截器配置,并介绍了Redis的依赖配置与文件操作相关功能,包括文件上传、下载、删除及批量删除的方法。同时,文档还列举了常见的HTTP请求错误代码及其含义,并详细解释了@RequestParam与@PathVariable等参数注解的区别与用法。
|
2月前
|
NoSQL Java Redis
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
这篇文章介绍了如何使用Spring Boot整合Apache Shiro框架进行后端开发,包括认证和授权流程,并使用Redis存储Token以及MD5加密用户密码。
37 0
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
|
2月前
|
存储 消息中间件 NoSQL
【redis】redis的特性和主要应用场景
【redis】redis的特性和主要应用场景
138 1
|
2月前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
59 1