这周工作的时候,碰见了这样一个问题,在我们的业务系统中,当用户访问自己的订单列表时,如果订单已经被添加了物流单号,但是后台还没有刷新到它的物流状态时,会去调用第三方物流的接口来刷新自己的物流状态。在这个过程中发现,一旦访问频率太过频繁的话就会被第三方限制,在一段时间内所有再发过去的请求都会被ban掉。
当一旦出现限流的情况,那么所有用户的物流状态都将无法被查询及刷新,将会给用户带来很不好的用户体验。所以我们的业务系统就需要实现这样的功能:
用户第一次访问自己的订单列表时,直接调用第三方物流接口获取一次状态
在接下来的一段时间,在访问订单列表时,不调用第三方接口刷新状态。做出此判断的依据是,对于用户来说,订单列表的访问功能是必须的,但是物流状态可能并非是刚需,因此此段时间绕过调用第三方接口
当用户在一段时间内,访问频率到达一定量时,例如在60秒内访问了5次,那么判断用户获取物流状态的需求非常急迫,放行一次调用第三方接口,之后再次恢复之前的规则
对以上需求进行了一下评估后,发现无论是Hystrix和Sentinel的限流规则,还是网关的漏桶和令牌桶,对我们来说都不是很适用,因此决定自己写一个组件,来实现这个限流规则。我们知道,Hystrix是基于时间窗口内的失败统计,以及线程池或信号量隔离实现的快速失败机制,那么我们就仿照这个模式来实现自己的限流功能。
先从最基础的功能部分开始实现,实现一个滑动时间窗口,来统计一段时间内接口的调用次数:
@Slf4j public class MethodAccessWindow { @AllArgsConstructor @Data class Node{ long time; } Queue<Node> queue; ScheduledExecutorService scheduledExecutorService; private int windowTime; private int size; public MethodAccessWindow(int windowTime, int size){ queue=new ArrayBlockingQueue<>(size); this.windowTime = windowTime; this.size=size; init(); } private void init(){ System.out.println("初始化定时任务"); scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.scheduleWithFixedDelay(()->{ clean(); },windowTime*1000,1000, TimeUnit.MILLISECONDS); } public boolean canReceive(){ if(queue.size()>=size){ return false; }else { queue.add(new Node(System.currentTimeMillis())); return true; } } public void clean(){ for (Node node:queue){ if (System.currentTimeMillis()-node.getTime()> (windowTime *1000)){ queue.poll(); } } } public void reset(){ queue.clear(); } public void destroy(){ log.info("destroy"); try { scheduledExecutorService.shutdown(); if (!scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS)) { scheduledExecutorService.shutdownNow(); } } catch (Exception e) { e.printStackTrace(); System.out.println("awaitTermination interrupted: " + e); scheduledExecutorService.shutdownNow(); } log.info("end destroy"); } }
在上面的类中:
windowTime表示滑动窗口的时间长度,size表示能够接受的任务数,使用队列来接收任务,队列长度为size
在构造函数中启动线程池执行一个定时任务,会遍历队列中的节点,当节点的存活时间大于窗口时间时,删除过期节点。如果对实时要求比较高,可以修改定时任务的执行间隔
canReceive方法用于接收任务,当队列长度已满时,返回false
reset方法用于清空队列,即重置窗口值
进行测试,设置为20秒的时间窗口可以接受5次请求:
public class Test { public static void main(String[] args) throws InterruptedException { MethodAccessWindow methodAccessWindow =new MethodAccessWindow(20,5); for (int i = 0; i < 30; i++) { System.out.println(i+" "+ methodAccessWindow.canReceive()); TimeUnit.SECONDS.sleep(1); } methodAccessWindow.destroy(); } }
在队列满后拒绝请求:
当到达第20秒时,前19个时间窗口内只有4个任务存在,因此可以接受任务,之后4秒同理:
完成了时间窗口,我们要把它应用在需要被限流的方法上,因此仿照Hystrix的格式,定义一个注解:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface NeedPassLimit { int timeWindow() default 100; int frequency() default 10; String fallBackMethod() default ""; }
按照之前的规则,在timeWindow的时间窗口,当访问次数超过frequency时,进行一次放行,否则执行fallBackMethod指定的降级方法。将注解写在Service的方法上:
@Service @Slf4j public class MyService { @NeedPassLimit(timeWindow = 20, frequency = 5, fallBackMethod = "fallback") public String getInfo(Long userId){ log.info("放行:"+userId.toString()); return "success"; } public String fallback(Long userId){ log.info("拦截:"+userId.toString()); return "restriction"; } }
这里在方法中,传入了一个userId字段,用以表明是哪个用户调用的方法。这是因为,按照仓壁模式,需要对用户以及访问的方法创建滑动窗口的隔离,这里简单使用类名加方法名加userId的方式,来区分不用的滑动窗口实例。给MethodAccessWindow添加一个字段windowKey,并修改构造方法:
private String windowKey; public MethodAccessWindow(int windowTime, int size, String windowKey){ queue=new ArrayBlockingQueue<>(size); this.windowTime = windowTime; this.size=size; this.windowKey = windowKey; init(); }
接下来,实现重要的切面方法:
@Component @Aspect public class MethodPassAspect { //缓存各个用户的 Window @Getter private ConcurrentHashMap<String, MethodAccessWindow> passerMap =new ConcurrentHashMap<>(); @Pointcut("@annotation(com.cn.hydra.aspectdemo.rule.annotation.NeedPassLimit)") public void freshPointCut() { } @Around("freshPointCut()") public Object doAround(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); if (method.isAnnotationPresent(NeedPassLimit.class)) { Object[] args = point.getArgs(); String[] parameterNames = signature.getParameterNames(); List<String> paramNameList = Arrays.asList(parameterNames); String passerKey=null; if (paramNameList.contains("userId")) { passerKey=point.getTarget().getClass().getName() +"#"+method.getName()+"#"+args[paramNameList.indexOf("userId")]; } NeedPassLimit annotation = method.getAnnotation(NeedPassLimit.class); int timeWindow = annotation.timeWindow(); int frequency = annotation.frequency(); String fallBackMethodName = annotation.fallBackMethod(); MethodAccessWindow methodAccessWindow; if (passerMap.keySet().contains(passerKey)) { methodAccessWindow = passerMap.get(passerKey); }else { //第一次,放过请求 methodAccessWindow= new MethodAccessWindow(timeWindow,frequency,passerKey); passerMap.put(passerKey,methodAccessWindow); Object object = point.proceed(); return object; } if (methodAccessWindow.canReceive()){ Object fallbackObject = invokeFallbackMethod(method, point.getTarget(), fallBackMethodName, args); return fallbackObject; }else{ Object object = point.proceed(); methodAccessWindow.reset(); return object; } } return null; } private Object invokeFallbackMethod(Method method, Object bean, String fallbackMethodName, Object[] arguments) throws Exception { Class beanClass = bean.getClass(); Method fallbackMethod = beanClass.getMethod(fallbackMethodName, method.getParameterTypes()); Object fallbackObject = fallbackMethod.invoke(bean, arguments); return fallbackObject; } }
在上面方法中:
passerMap 缓存了各个用户访问的接口的滑动窗口,用以实现仓壁模式
当用户第一次访问时,执行原请求方法,执行后创建滑动窗口,放进passerMap 中缓存
当用户之后访问时,调用canReceive方法,如果返回为true,执行降级方法
当canReceive返回为false时,执行原方法,并重置滑动窗口
在实现了上面的主要功能后,需要注意滑动窗口是一直存在的,为了保护系统资源,我们有必要销毁不需要的滑动窗口。主要需要实现将滑动窗口对象实例从切面的passerMap 中移除,之后交给jvm垃圾回收器进行回收即可。
实现方式也很简单,当我们判断该滑动窗口已经很久没有使用时,发送一个自定义事件给我们自定义的spring事件监听器,由监听器负责移除该滑动窗口实例。先定义窗口关闭事件:
public class WindowCloseEvent extends ApplicationEvent { @Getter private String windowKey; public WindowCloseEvent(Object source, String windowKey) { super(source); this.windowKey = windowKey; } }
然后定义事件监听器,监听上面的WindowCloseEvent事件:
@Component @Slf4j public class WindowCloseEventListener implements ApplicationListener<WindowCloseEvent> { @Autowired MethodPassAspect methodPassAspect; @Override public void onApplicationEvent(WindowCloseEvent windowCloseEvent) { log.info("close:"+windowCloseEvent.getWindowKey()); ConcurrentHashMap<String, MethodAccessWindow> passerMap = methodPassAspect.getPasserMap(); System.out.println(passerMap.toString()); passerMap.remove(windowCloseEvent.getWindowKey()); System.out.println(passerMap.toString()); } }
再定义一个事件发布方法的EventPublisher,用来发送事件:
@Component public class EventPublisher { @Autowired private ApplicationEventPublisher applicationEventPublisher; public void publish(ApplicationEvent applicationEvent){ applicationEventPublisher.publishEvent(applicationEvent); } }
那么,究竟该在什么时机去发送这个事件呢?我们可以在滑动窗口中记录一下最后使用时间,当超过约定的最大未使用时间时,将其从切面的passerMap中移除。在滑动窗口类中添加两个变量,修改构造方法,初始化这两个变量:
private long lastCallTime;//最后调用时间 private long shutDownTime;//最长未调用时间 public MethodAccessWindow(int windowTime, int size, String windowKey){ queue=new ArrayBlockingQueue<>(size); this.windowTime = windowTime; this.size=size; this.windowKey = windowKey; lastCallTime=System.currentTimeMillis(); shutDownTime =windowTime*1000*3;//可自由进行长短额定义 init(); }
在每次调用方法时,先刷新lastCallTime:
public boolean canReceive(){ this.lastCallTime=System.currentTimeMillis(); ...... }
回头看一下,滑动窗口实例对象在后台存在一个定时任务,用于清除超过时间窗口的任务,那么可以在这后面可以再添加一个任务,用于判断当前时间减去最后调用时间,是否超过定义个最长不使用时间。但是有一个问题,MethodAccessWindow并不是一个注册到spring环境的Bean,不能使用自动注入来注入EventPublisher对象,这里可以通过静态方法来获取spring容器,之后再使用容器的getBean方法拿到EventPublisher的对象。添加事件发布对象及其set方法:
private static EventPublisher eventPublisher; public static void setEventPubisher(ApplicationContext applicationContext ){ eventPublisher=applicationContext.getBean(EventPublisher.class); }
在spring容器完成初始化后,从启动类直接给滑动窗口注入:
@SpringBootApplication public class AspectdemoApplication { public static void main(String[] args) { ApplicationContext applicationContext = SpringApplication.run(AspectdemoApplication.class, args); MethodAccessWindow.setEventPubisher(applicationContext); } }
修改定时任务,在定时任务中发送关闭滑动窗口事件,并发送关闭线程池请求:
public void clean(){ for (Node node:queue){ if (System.currentTimeMillis()-node.getTime()> (windowTime *1000)){ queue.poll(); } } //超过时间不用则自动销毁 if (System.currentTimeMillis()-lastCallTime >= shutDownTime){ log.info("发送event"); WindowCloseEvent windowCloseEvent=new WindowCloseEvent(this, windowKey); eventPublisher.publish(windowCloseEvent); log.info("发送event end"); destroy(); } }
调用Service接口进行测试:
在滑动窗口时间20秒,最大空闲时间设置为窗口事件3倍的情况下,在最后一次请求调用的后1分钟,发送了窗口关闭事件并被监听,从passerMap中移除,并且在之后销毁了线程池。这样,一个能够自定义规则的限流组件就完成了。
代码github地址: