前言
在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。 限流的目的是通过对并发访问请求进行限速或者一个时间窗口内的的请求数量进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待
我们上次讲解了如何使用Sentinel来实现服务限流,今天我们来讲解下如何使用Redisson+AOP+自定义注解+反射优雅的实现服务限流,本文讲解的限流实现支持针对用户IP限流,整个接口的访问限流,以及对某个参数字段的限流,并且支持请求限流后处理回调
1.导入Redisson
引入依赖
我们首先导入Redisson所需要的依赖,我们这里的springboot版本为2.7.12
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.23.4</version> </dependency>
编写配置
# Redisson客户端 redis: sdk: config: host: redis服务IP port: 6379 password: redis密码,没有可删掉这行 pool-size: 10 min-idle-size: 5 idle-timeout: 30000 connect-timeout: 5000 retry-attempts: 3 retry-interval: 1000 ping-interval: 60000 keep-alive: true
声明Redisson客户端Bean
配置映射类RedisCientConfigProperties
@Data @ConfigurationProperties(prefix = "redis.sdk.config", ignoreInvalidFields = true) public class RedisCientConfigProperties { /** host:ip */ private String host; /** 端口 */ private int port; /** 账密 */ private String password; /** 设置连接池的大小,默认为64 */ private int poolSize = 64; /** 设置连接池的最小空闲连接数,默认为10 */ private int minIdleSize = 10; /** 设置连接的最大空闲时间(单位:毫秒),超过该时间的空闲连接将被关闭,默认为10000 */ private int idleTimeout = 10000; /** 设置连接超时时间(单位:毫秒),默认为10000 */ private int connectTimeout = 10000; /** 设置连接重试次数,默认为3 */ private int retryAttempts = 3; /** 设置连接重试的间隔时间(单位:毫秒),默认为1000 */ private int retryInterval = 1000; /** 设置定期检查连接是否可用的时间间隔(单位:毫秒),默认为0,表示不进行定期检查 */ private int pingInterval = 0; /** 设置是否保持长连接,默认为true */ private boolean keepAlive = true; }
Configuration @EnableConfigurationProperties(RedisCientConfigProperties.class) public class RedisClientConfig { @Bean("redissonClient") public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisCientConfigProperties properties) { Config config = new Config(); // 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96 // config.setCodec(new RedisCodec()); config.useSingleServer() .setAddress("redis://" + properties.getHost() + ":" + properties.getPort()) .setPassword(properties.getPassword()) .setConnectionPoolSize(properties.getPoolSize()) .setConnectionMinimumIdleSize(properties.getMinIdleSize()) .setIdleConnectionTimeout(properties.getIdleTimeout()) .setConnectTimeout(properties.getConnectTimeout()) .setRetryAttempts(properties.getRetryAttempts()) .setRetryInterval(properties.getRetryInterval()) .setPingConnectionInterval(properties.getPingInterval()) .setKeepAlive(properties.isKeepAlive()) ; RedissonClient redissonClient = Redisson.create(config); // 注册消息发布订阅主题Topic // 找到所有实现了Redisson中MessageListener接口的bean名字 String[] beanNamesForType = applicationContext.getBeanNamesForType(MessageListener.class); for (String beanName : beanNamesForType) { // 通过bean名字获取到监听bean MessageListener bean = applicationContext.getBean(beanName, MessageListener.class); Class<? extends MessageListener> beanClass = bean.getClass(); // 如果bean的注解里包含我们的自定义注解RedisTopic.class,则以RedisTopic注解的值作为name将该bean注册到bean工厂,方便在别处注入 if (beanClass.isAnnotationPresent(RedisTopic.class)) { RedisTopic redisTopic = beanClass.getAnnotation(RedisTopic.class); RTopic topic = redissonClient.getTopic(redisTopic.topic()); topic.addListener(String.class, bean); ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory(); beanFactory.registerSingleton(redisTopic.topic(), topic); } } return redissonClient; } static class RedisCodec extends BaseCodec { private final Encoder encoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { ByteBufOutputStream os = new ByteBufOutputStream(out); JSON.writeJSONString(os, in, SerializerFeature.WriteClassName); return os.buffer(); } catch (IOException e) { out.release(); throw e; } catch (Exception e) { out.release(); throw new IOException(e); } }; private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class); @Override public Decoder<Object> getValueDecoder() { return decoder; } @Override public Encoder getValueEncoder() { return encoder; } } }
2.自定义注解
我们这里自定义一个注解来作为后续AOP切面编程的切点
根据注解Key属性的值,我们会有如下情况
all:针对整个接口限流
request_ip:针对各个用户的访问IP限流
其他str:根据参数作为标识符限流,比如我这里key=userid,那么我会根据参数中的userid来限流
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface AccessInterceptor { /** 用哪个字段作为拦截标识符,配置all则是对整个接口限流,配置request_ip, * 则是对访问ip限流,配置其他str,则会到参数中寻找对应名称的属性值(包括对象内部属性) */ String key() default "all"; /** 限制频次(每秒请求次数) */ long permitsPerSecond(); /** 黑名单拦截(多少次限制后加入黑名单)0 不限制 */ double blacklistCount() default 0; /** 拦截后的执行方法 */ String fallbackMethod(); }
3.AOP切面编程
导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>
编写AOP限流代码
我们doRouter切面函数以AccessInterceptor注解为切点,根据注解的各类配置来执行整个限流过程。
我们通过使用Redisson的RRateLimiter限流器,基于令牌桶实现访问限流,并对已经限流的访问记录黑名单次数,超过设置的黑名单阈值就会被加入黑名单中,较长时间无法访问
代码较长,为了缩短篇幅就一次性放上来了,各处已经打上了详细注解,若有疑问可评论区留言。
@Slf4j @Aspect public class RateLimiterAOP { // 注入我们声明的redisson客户端 @Resource private RedissonClient redissonClient; // 限流RateLimiter缓存前缀 private static final String rateLimiterName = "test:RateLimiter:"; // 黑名单原子计数器缓存前缀 private static final String blacklistPrefix = "test:RateBlockList:"; @Around("@annotation(accessInterceptor)") public Object doRouter(ProceedingJoinPoint jp, AccessInterceptor accessInterceptor) throws Throwable { // 获取注解配置的字段key String key = accessInterceptor.key(); if (StringUtils.isBlank(key)) { log.error("限流RateLimiter注解中的 Key 属性为空!"); throw new RuntimeException("RateLimiter注解中的 Key 属性为空!"); } log.info("限流拦截关键字为 {}", key); // 根据key获取拦截标识符字段 String keyAttr = getAttrValue(key, jp.getArgs()); // 黑名单拦截,非法访问次数超过黑名单阈值 if (!"all".equals(keyAttr) && accessInterceptor.blacklistCount() != 0 && redissonClient.getAtomicLong(blacklistPrefix + keyAttr).get() > accessInterceptor.blacklistCount()) { log.info("限流-黑名单拦截:{}", keyAttr); return fallbackMethodResult(jp, accessInterceptor.fallbackMethod()); } // 获取限流器 -> Redisson RRateLimiter RRateLimiter rateLimiter = redissonClient.getRateLimiter(rateLimiterName + keyAttr); if (!rateLimiter.isExists()) { // 创建令牌桶数据模型,单位时间内产生多少令牌 rateLimiter.trySetRate(RateType.PER_CLIENT,1, accessInterceptor.permitsPerSecond(), RateIntervalUnit.MINUTES); } // 限流判断,没有获取到令牌,超出频率 if (!rateLimiter.tryAcquire()) { // 如果开启了黑名单限制,那么就记录当前的非法访问次数 if (accessInterceptor.blacklistCount() != 0) { RAtomicLong atomicLong = redissonClient.getAtomicLong(blacklistPrefix + keyAttr); atomicLong.incrementAndGet(); // 原子自增 atomicLong.expire(24, TimeUnit.HOURS); // 刷新黑名单原子计数器器过期时间为24小时 } log.info("限流-频率过高拦截:{}", keyAttr); return fallbackMethodResult(jp, accessInterceptor.fallbackMethod()); } // 返回结果 return jp.proceed(); } /** * 调用用户配置的回调方法,使用反射机制实现。 */ private Object fallbackMethodResult(JoinPoint jp, String fallbackMethod) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { // 通过JoinPoint对象获取方法的签名(Signature) Signature sig = jp.getSignature(); // 将方法签名转换为MethodSignature对象,以便获取方法的详细信息 MethodSignature methodSignature = (MethodSignature) sig; // 获取到具体的方法对象,通过方法名和参数(所以回调函数参数一定要和原方法一致) Method method = jp.getTarget().getClass().getMethod(fallbackMethod, methodSignature.getParameterTypes()); // 调用目标对象的方法,并传入当前对象(jp.getThis())和方法的参数(jp.getArgs())。 return method.invoke(jp.getThis(), jp.getArgs()); } /** * 根据JoinPoint对象获取其所代表的方法对象 */ private Method getMethod(JoinPoint jp) throws NoSuchMethodException { Signature sig = jp.getSignature(); MethodSignature methodSignature = (MethodSignature) sig; return jp.getTarget().getClass().getMethod(methodSignature.getName(), methodSignature.getParameterTypes()); } /** * 实际根据自身业务调整,主要是为了获取通过某个值做拦截 */ public String getAttrValue(String attr, Object[] args) { String filedValue = null; for (Object arg : args) { try { // 找到HttpServletRequest对象来获取请求IP地址(如果是根据IP拦截的话) if ("request_ip".equals(attr) && arg instanceof HttpServletRequest) { HttpServletRequest request = (HttpServletRequest) arg; filedValue = IPUtils.getIpAddr(request); } // 找到了值,返回 if (StringUtils.isNotBlank(filedValue)) { break; } // fix: 使用lombok时,uId这种字段的get方法与idea生成的get方法不同,会导致获取不到属性值,改成反射获取解决 filedValue = String.valueOf(this.getValueByName(arg, attr)); } catch (Exception e) { log.error("获取路由属性值失败 attr:{}", attr, e); } } return filedValue; } /** * 获取对象的特定属性值(反射) * * @param item 对象 * @param name 属性名 * @return 属性值 * @author tang */ private Object getValueByName(Object item, String name) { try { // 获取指定对象中对应属性名的Field对象 Field field = getFieldByName(item, name); // 获取到的Field对象为null,表示属性不存在,直接返回null。 if (field == null) { return null; } // 将Field对象设置为可访问,以便获取私有属性的值。 field.setAccessible(true); // 获取属性值,并将其赋值给变量o。 Object o = field.get(item); // 将Field对象设置为不可访问,以保持对象的封装性。 field.setAccessible(false); return o; } catch (IllegalAccessException e) { return null; } } /** * 根据名称获取方法,该方法同时兼顾继承类获取父类的属性 * * @param item 对象 * @param name 属性名 * @return 该属性对应方法 * @author tang */ private Field getFieldByName(Object item, String name) { try { Field field; try { // 获取指定对象中对应属性名的Field对象。 field = item.getClass().getDeclaredField(name); } catch (NoSuchFieldException e) { // 没有找到,抛出NoSuchFieldException异常,尝试获取父类中对应属性名的Field对象 field = item.getClass().getSuperclass().getDeclaredField(name); } return field; } catch (NoSuchFieldException e) { // 父类也没找到对应属性名的Field对象,寄,返回null return null; } } }
以上代码用到了自己写的一个工具类IPUtils来获取请求的IP地址,内容如下
public class IPUtils { private static Logger logger = LoggerFactory.getLogger(IPUtils.class); private static final String IP_UTILS_FLAG = ","; private static final String UNKNOWN = "unknown"; private static final String LOCALHOST_IP = "0:0:0:0:0:0:0:1"; private static final String LOCALHOST_IP1 = "127.0.0.1"; /** * 获取IP地址 * <p> * 使用Nginx等反向代理软件, 则不能通过request.getRemoteAddr()获取IP地址 * 如果使用了多级反向代理的话,X-Forwarded-For的值并不止一个,而是一串IP地址,X-Forwarded-For中第一个非unknown的有效IP字符串,则为真实IP地址 */ public static String getIpAddr(HttpServletRequest request) { String ip = null; try { //以下两个获取在k8s中,将真实的客户端IP,放到了x-Original-Forwarded-For。而将WAF的回源地址放到了 x-Forwarded-For了。 ip = request.getHeader("X-Original-Forwarded-For"); if (StringUtils.isEmpty(ip) || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("X-Forwarded-For"); } //获取nginx等代理的ip if (StringUtils.isEmpty(ip) || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("x-forwarded-for"); } if (StringUtils.isEmpty(ip) || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("Proxy-Client-IP"); } if (StringUtils.isEmpty(ip) || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("WL-Proxy-Client-IP"); } if (StringUtils.isEmpty(ip) || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("HTTP_CLIENT_IP"); } if (StringUtils.isEmpty(ip) || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("HTTP_X_FORWARDED_FOR"); } //兼容k8s集群获取ip if (StringUtils.isEmpty(ip) || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getRemoteAddr(); if (LOCALHOST_IP1.equalsIgnoreCase(ip) || LOCALHOST_IP.equalsIgnoreCase(ip)) { //根据网卡取本机配置的IP InetAddress iNet = null; try { iNet = InetAddress.getLocalHost(); } catch (UnknownHostException e) { logger.error("getClientIp error: {}", e); } ip = iNet.getHostAddress(); } } } catch (Exception e) { logger.error("IPUtils ERROR ", e); } //使用代理,则获取第一个IP地址 if (!StringUtils.isEmpty(ip) && ip.indexOf(IP_UTILS_FLAG) > 0) { ip = ip.substring(0, ip.indexOf(IP_UTILS_FLAG)); } return ip; } }
4.接口使用自定义注解实现限流
使用自定义限流注解
比如我在用户controller层的登录接口上使用注解,key为request_ip,表示根据用户IP限流,回调函数为fallbackMethod,每分钟访问限制10次
@PostMapping(value = "/login") @AccessInterceptor(key = "request_ip", fallbackMethod = "loginErr", permitsPerSecond = 1L, blacklistCount = 10) public Response<String> doLogin(@RequestParam String code, HttpServletRequest request){
绑定限流回调函数
这里需要注意的是,回调函数的参数必须和你使用限流注解的方法参数一致,否则报对应方法找不到的错误(因为这里是通过反射机制找到回调函数执行的)
public Response<String> loginErr(String code, HttpServletRequest request) { System.out.println("限流触发回调,参数信息:" + code); return Response.<String>builder() .code(Constants.ResponseCode.FREQUENCY_LIMITED.getCode()) .info(Constants.ResponseCode.FREQUENCY_LIMITED.getInfo()) .data(code) .build(); }
总结
以上通过Redission+自定义注解+AOP+反射实现了对不同标识符的限流和黑名单拦截,并且可以绑定限流回调函数来处理限流后的逻辑,代码篇幅较长,各位小伙伴也可以尝试继续优化一下这里的设计,减少request_ip这种魔法值(实在懒得改了),感谢您的收看,万字长文(虽然大部分是代码),有帮助就多多支持吧