大家好,我是飘渺。
在之前一篇文章中我们详细介绍了为什么需要对接口进行限流,也介绍了常见的限流算法,最后还基于Guava工具类实现了接口限流。但是这种方式有个问题,无法实现分布式限流。那今天我们来利用Redis + Lua 来实现分布式限流。
Lua 脚本和 MySQL 数据库的存储过程比较相似,他们执行一组命令,所有命令的执行要么全部成功或者失败,以此达到原子性。也可以把 Lua 脚本理解为,一段具有业务逻辑的代码块。
实现过程
第一步:引入Redis依赖包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
第二步:配置Redis
/** * @author JAVA日知录 * @date 2022/5/2 22:35 */ @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); // 使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } }
第二步:自定义限流注解
@Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) @Documented public @interface RedisLimit { /** * 资源的key,唯一 * 作用:不同的接口,不同的流量控制 */ String key() default ""; /** * 最多的访问限制次数 */ long permitsPerSecond() default 2; /** * 过期时间也可以理解为单位时间,单位秒,默认60 */ long expire() default 60; /** * 得不到令牌的提示语 */ String msg() default "系统繁忙,请稍后再试."; }
第三步:创建限流异常
/** * @author JAVA日知录 * Redis限流自定义异常 * @date 2022/5/2 21:43 */ public class RedisLimitException extends RuntimeException{ public RedisLimitException(String msg) { super( msg ); } }
第四步:使用AOP切面拦截限流注解
/** * Limit AOP * @author JAVA日知录 * @date 2021/9/24 3:07 下午 */ @Slf4j @Aspect @Component public class RedisLimitAop { @Autowired private StringRedisTemplate stringRedisTemplate; @Pointcut("@annotation(com.jianzh5.blog.limit.redis.RedisLimit)") private void check() { } @Before("check()") public void before(JoinPoint joinPoint) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); //拿到RedisLimit注解,如果存在则说明需要限流 RedisLimit redisLimit = method.getAnnotation(RedisLimit.class); if(redisLimit != null){ //获取redis的key String key = redisLimit.key(); String className = method.getDeclaringClass().getName(); String name = method.getName(); String limitKey = key + className + method.getName(); log.info(limitKey); if(StringUtils.isEmpty(key)){ throw new RedisLimitException( "key cannot be null" ); } long limit = redisLimit.permitsPerSecond(); long expire = redisLimit.expire(); List<String> keys = new ArrayList<>(); keys.add( key ); String luaScript = buildLuaScript(); RedisScript<Long> redisScript = new DefaultRedisScript<>( luaScript, Long.class ); Long count = stringRedisTemplate.execute( redisScript, keys, String.valueOf(limit), String.valueOf(expire) ); log.info( "Access try count is {} for key={}", count, key ); if (count != null && count == 0) { log.debug("令牌桶={},获取令牌失败",key); throw new RedisLimitException(redisLimit.msg()); } } } /** * 构建redis lua脚本 * @return */ private String buildLuaScript() { StringBuilder luaString = new StringBuilder(); luaString.append( "local key = KEYS[1]" ); //获取ARGV内参数Limit luaString.append( "\nlocal limit = tonumber(ARGV[1])" ); //获取key的次数 luaString.append( "\nlocal curentLimit = tonumber(redis.call('get', key) or \"0\")" ); luaString.append( "\nif curentLimit + 1 > limit then" ); luaString.append( "\nreturn 0" ); luaString.append( "\nelse" ); //自增长 1 luaString.append( "\n redis.call(\"INCRBY\", key, 1)" ); //设置过期时间 luaString.append( "\nredis.call(\"EXPIRE\", key, ARGV[2])" ); luaString.append( "\nreturn curentLimit + 1" ); luaString.append( "\nend" ); return luaString.toString(); } }
第五步:给需要限流的接口加上注解
/** * 公众号:JAVA日知录 * 限流测试类基于Redis限流 */ @Slf4j @RestController @RequestMapping("/limit/redis") public class LimitRedisController { /** * 基于Redis AOP限流 */ @GetMapping("/test") @RedisLimit(key = "redis-limit:test", permitsPerSecond = 2, expire = 1, msg = "当前排队人数较多,请稍后再试!") public String test() { log.info("限流成功。。。"); return "ok"; } }
第六步:体验效果
通过访问测试地址:http://127.0.0.1:8080/limit/redis/test,反复刷新并观察输出结果:
正常响应时:
{"status":100,"message":"操作成功","data":"ok","timestamp":1652343229643}
触发限流时:
{"status":500,"message":"当前排队人数较多,请稍后再试!","data":null,"timestamp":1652343239035}
通过观察得之,基于自定义注解同样实现了接口限流的效果。
优化
程序每次执行每次都需要通过buildLuaScript()
方法构建lua执行脚本,比较 low,我们可以生成一个lua文件放在resources目录下,利用@PostConstruct
注解提前加载。
- 在resouces文件夹下创建lua文件 rateLimiter.lua
--获取KEY local key = KEYS[1] local limit = tonumber(ARGV[1]) local curentLimit = tonumber(redis.call('get', key) or "0") if curentLimit + 1 > limit then return 0 else -- 自增长 1 redis.call('INCRBY', key, 1) -- 设置过期时间 redis.call('EXPIRE', key, ARGV[2]) return curentLimit + 1 end
- 修改RedisLimitAop,通过@PostConstruct注入DefaultRedisScript
@Slf4j @Aspect @Component public class RedisLimitAop { @Autowired private StringRedisTemplate stringRedisTemplate; private DefaultRedisScript<Long> redisScript; @PostConstruct public void init(){ redisScript = new DefaultRedisScript<>(); redisScript.setResultType(Long.class); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimiter.lua"))); } ... }
小结
基于Redis + Lua 可以很方便地实现分布式限流,算是SpringBoot老鸟系列限流文章的补充扩展。
那么现在问题来了,我们现在有基于Guava实现的单机限流,又有基于Redis+Lua实现的分布式限流,那能不能将两种限流功能做成一个独立的公共组件,让使用方根据实际情况选择对应的限流功能呢?
好了,今天的文章就到这里了,我是飘渺,咱们下期见~