四、手写断路器
4.1、断路器设计
本质就是在当前远程调用发起前对其进行代理:
时间窗口滑动模型图:
断路器状态介绍以及不同的状态转变方案:三个状态closed、half open、open
关:服务正常调用 A---》B 开:在一段时间内,调用失败次数达到阀值(5s 内失败 3 次)(5s 失败 30 次的)则断路器打开,直接 return 半开:断路器打开后,过一段时间,让少许流量尝试调用 B 服务,如果成功则断路器关闭, 使服务正常调用,如果失败,则继续半开
注意点:
1、一个服务一个断路器实例。
2、其他手写时的相关问题。
断路器实例中的属性:①断路器当前的状态。②当前的错误次数。
三种状态如何切换?
默认刚开始是closed(也就是正常去进行远程调用状态),一旦访问失败了一次,此时就会变为open状态,那么在open状态过程中会直接返回对应的断路器结果,在一定的时间窗口(指定秒数)到达之后【多线程添加一个定时器】,此时状态会进入到半开状态,那么就会放一些流量出来去尝试访问服务提供方,若是发现此时访问成功!那么状态依旧会修改为closed。
为什么要使用一个定时器来进行定期清除呢?一些大量并发场景下,需要使用一个定时器来进行对失败次数清零。
4.2、实现断路器功能
首先准备好在3.1中的调用服务新案例,然后我们基于此来实现一个断路器:
实现完成之后如下:
①状态枚举:
package com.changlu.myhystrix.hystrix.model; /** * @Description: * @Author: changlu * @Date: 9:54 AM */ public enum HystrixStatus { //定义三种状态:关闭、开启、半开 CLOSE, OPEN, HALF_OPEN }
②断路器注解:
package com.changlu.myhystrix.hystrix.anno; import java.lang.annotation.*; /** * @Description: * @Author: changlu * @Date: 9:59 AM */ @Target(ElementType.METHOD) //面向方法 @Retention(RetentionPolicy.RUNTIME) //运行时 @Documented @Inherited public @interface MyHystrix { }
③断路器切面:
package com.changlu.myhystrix.hystrix.aspect; import com.changlu.myhystrix.hystrix.HystrixPlus; import com.changlu.myhystrix.hystrix.model.HystrixStatus; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Random; /** * @Description: 熔断器切面 * @Author: changlu * @Date: 10:00 AM */ @Component @Aspect public class HystrixAspect { //切面表达式 // public static final String POINT_COT = "execution (* com.changlu.myhystrix.controller.CustomerController.customerRent(..))"; //定义一个断路器Map private static Map<String, HystrixPlus> hystrixMap = new HashMap<>(); static { hystrixMap.put("rent-car-service", new HystrixPlus()); } //随机器工具 public static ThreadLocal<Random> randomThreadLocal = ThreadLocal.withInitial(()->new Random()); //根据注解来进行切面处理 @Around(value = "@annotation(com.changlu.myhystrix.hystrix.anno.MyHystrix)") public Object hystrixAround(ProceedingJoinPoint joinPoint) { //结果集 Object res = null; //根据当前的服务名来获取到对应的断路器 HystrixPlus hystrix = hystrixMap.get("rent-car-service"); HystrixStatus status = hystrix.getStatus(); switch (status) { case CLOSE: try { return joinPoint.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); //进行计数,并且响应结果 hystrix.addFailCount(); return "熔断器返回结果"; } case OPEN://打开状态,表示不能调用 return "熔断器返回结果"; case HALF_OPEN: Random random = randomThreadLocal.get(); int num = random.nextInt(5);//[0-4] //方便回收 randomThreadLocal.remove(); //放行部分流量 if (num == 1) { try { res = joinPoint.proceed(); //调用成功,断路器关闭 hystrix.setStatus(HystrixStatus.CLOSE); //进行唤醒清理程序 synchronized (hystrix.getLock()) { hystrix.getLock().notifyAll(); } return res; } catch (Throwable throwable) { throwable.printStackTrace(); return "熔断器返回结果"; } } default: return "熔断器返回结果"; } } }
④断路器实现:
package com.changlu.myhystrix.hystrix; import com.changlu.myhystrix.hystrix.model.HystrixStatus; import lombok.Data; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @Description: * @Author: changlu * @Date: 10:04 AM */ @Data public class HystrixPlus { //时间窗口 private static final Integer WINDOW_TIME = 20; //失败次数 private static final Integer MAX_FAIL_COUNT = 3; //定义一个状态 private HystrixStatus status = HystrixStatus.CLOSE; //错误次数计数器 private AtomicInteger currentFailCount = new AtomicInteger(0); //定义一个线程池 private ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 4, 8, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); //锁 private Object lock = new Object(); { //提交定期清零报错次数 poolExecutor.execute(()->{ while (true) { try { TimeUnit.SECONDS.sleep(WINDOW_TIME); } catch (InterruptedException e) { e.printStackTrace(); } //根据当前的状态来判断是否要进行清理 if (this.status.equals(HystrixStatus.CLOSE)) { this.currentFailCount.set(0); }else { // 半开或者开 不需要去记录次数 这个线程可以不工作 // 学过生产者 消费者模型 wait notifyAll condition singleAll await 它们只能随机唤醒某一个线程 // lock锁 源码 CLH 队列 放线程 A B C D E park unpark 可以 唤醒指定的某一个线程 // LockSupport.park(); // LockSupport.unpark(); try { //进行阻塞,防止大量占据cpu this.lock.wait(); System.out.println("开始进行失败次数清零操作"); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } //增加错误次数,若是错误此时达到瓶颈,那么就需要将当前状态转为open状态并提交定时任务来进行修改为half open状态,并且清零 public void addFailCount() { int i = currentFailCount.incrementAndGet(); if (i >= MAX_FAIL_COUNT) { //将当前熔断器状态设置开启状态 this.status = HystrixStatus.OPEN; poolExecutor.execute(()->{ try { TimeUnit.SECONDS.sleep(WINDOW_TIME); } catch (InterruptedException e) { e.printStackTrace(); } if (this.status != HystrixStatus.CLOSE) { //设置半开状态并且计数清零 this.status = HystrixStatus.HALF_OPEN; this.currentFailCount.set(0); } }); } } }
4.3、断路器测试
初始情况:启动三个服务,分别是注册中心,服务提供者以及服务消费方(也就是我们自定义实现断路器)
访问下网址路径:http://localhost:8082/customerRent
关闭服务提供方:
再此尝试访问:可以看到我们实现的熔断器起了效果
最终我们重启服务提供方:
可以看到也能够进行访问!
五、Hystrix配置
详细配置:hystrix 配置
对于配置中的隔离方式策略介绍如下:隔离策略包含thread线程以及semphore信号量隔离
线程隔离(场景:访问量比较大):
说明:按照 group(10 个线程)划分服务提供者,用户请求的线程 和做远程的线程不一样。 好处:当 B 服务调用失败了 或者请求 B 服务的量太大了 不会对 C 服务造成影响 用户访问比较大的情 况下使用比较好 异步的方式。 缺点:具有线程切换的开销,对机器性能影响。 应用场景 调用第三方服务 并发量大的情况下
SEMAPHORE 信号量隔离(场景:访问量比较小):
说明:每次请进来 有一个原子计数器 做请求次数的++ 当请求完成以后 --。 好处:对 cpu 开销小。 缺点:并发请求不易太多 当请求过多 就会拒绝请求 做一个保护机制。 场景:使用内部调用 ,并发小的情况下。 源码入门 HystrixCommand AbstractCommand HystrixThreadPool