一、功能设计
拦截远程调用,当远程服务异常,则拦截,直接返回;
若远程服务正常,则放行调用。
二、窗口滑动
在一个时间窗口
11:01->11:02->11:03->11:04->11:05
在一个窗口时间内,若调用失败次数达到一个值,做什么事情
三、熔断器的状态 HystrixStatus
/**
* 断路器的状态
* @author WHX
*/
public enum HystrixStatus {
OPEN(0),
CLOSE(1),
HALF_OPEN(2); // 半开是关的一种策略 半开: 使用少许请求测试提供者是否活了
// 半开是: 开(多一点) + 关
HystrixStatus(int code) {
}
}
四、断路器接口设计 HystixCmd
/**
* 断路器功能设计
* @author WHX
*/
public interface HystixCmd {
/**
* 拦截请求
*
* @param request
* @return
*/
Object interceptor(ProceedingJoinPoint request);
/**
* 通过请求
*
* @return
*/
Object pass(ProceedingJoinPoint request);
/**
* 测试请求
*
* @param request
* @return
*/
Object test(ProceedingJoinPoint request);
/**
* 修改拦截器的状态
*
* @param status
*/
void changeStatus(HystrixStatus status);
/**
* 获取自己的状态
*
* @return
*/
HystrixStatus getStatus();
}
五、切面拦截远程请求的方法 HystixAspect
@Aspect
@Component
public class HystixAspect {
/**
* key: 服务的id
* value:该服务对应的拦截器
*/
private Map<String, HystixCmd> hystixCmds = new HashMap<String, HystixCmd>();
{
//注入一个此服务的拦截器
hystixCmds.put("server-id", new HystixCmdImpl());
}
@Around("@annotation(com.sxt.feign.core.anno.HystixCmdRpc)")
public Object interceptor(ProceedingJoinPoint point) {
// 从容器里面取自己的拦截器
HystixCmd hystixCmd = hystixCmds.get("server-id");
switch (hystixCmd.getStatus()) {
case OPEN:
return hystixCmd.interceptor(point);
case CLOSE:
return hystixCmd.pass(point);
case HALF_OPEN:
// 有结果,没有抛异常
System.out.println("半开,我用3%的几率访问一下,看提供者活了没有");
Object result = hystixCmd.test(point);
return result;
default:
break;
}
return null; // 没有符合的条件
}
}
六、拦截器的实现HystixCmdImpl 实现上面的断路器接口
public class HystixCmdImpl implements HystixCmd {
/**
* 一个窗口的时间是5
*/
private static Long WINDOW_SLIDE_TIME = 5000L;
/**
* 在一个窗口内失败10 次就代表远程服务异常
*/
private static Integer MAX_FAIL_COUNT = 3;
/**
* 锁对象
*/
private Object lock = new Object();
/**
* 默认是关的
*/
private HystrixStatus status = HystrixStatus.CLOSE;
/**
* 随机数
*/
private static Random RDM = new Random();
/**
* 在一个窗口怎么统计次数
*/
private AtomicInteger currentFallCount = new AtomicInteger(0);
{
// 定时任务做啥事情, 若一个窗口里面它没有达到失败的阈值,但是我们不能让该值去影响下个窗口,就需要清空该窗口的值
new Thread(() -> {
while (true) {
try {
Thread.sleep(WINDOW_SLIDE_TIME); // 5s 执行一次检查
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if (this.getStatus() == HystrixStatus.CLOSE) {
// 断路器关闭
currentFallCount.set(0);
} else {
// 断路器打开了 或半开
System.out.println("断路器打开了,我统计失败次数,没有任何意义,我先死一会");
synchronized (lock) {
try {
//少许流量测试通过后再唤醒我
lock.wait();
System.out.println("远程服务正常了,我需要在启动清空窗口数据了");
currentFallCount.set(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}).start();
}
/**
* 拦截请求
*/
@Override
public Object interceptor(ProceedingJoinPoint request) {
// 不调用正常的值,使用备胎值
MethodSignature methodSignature = (MethodSignature) request.getSignature();
Method method = methodSignature.getMethod(); // 这是正常的实现类方法,我们需要调用备胎里面的方法
// 现在没法得到接口和实现类
Object object = getFallCallback(method);
Method callBack = null;
try {
callBack = object.getClass().getMethod(method.getName(), method.getParameterTypes());
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
Object fallBackResult = null;
try {
fallBackResult = callBack.invoke(object, request.getArgs()); // 直接调用备胎里面的方法
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
e.printStackTrace();
}
return fallBackResult;
}
/**
* 正常调用
*/
@Override
public Object pass(ProceedingJoinPoint request) {
Object result = null;
try {
result = request.proceed(request.getArgs());
} catch (Throwable e) {
// 有异常了,若异常为超时异常,代表失败了 一段时间失败n 次
// e.printStackTrace();
// 若失败了,我把失败次数++
currentFallCount.getAndIncrement();
if (currentFallCount.get() >= MAX_FAIL_COUNT) {
// 失败超过阈值
System.out.println("当前的窗口里面已经达到失败的阈值了,我把断路器打开");
this.changeStatus(HystrixStatus.OPEN); // 打开断路器,一段时间后,把断路器改为半开
new Thread(() -> {
try {
Thread.sleep(5000L);
System.out.println("过了一段时间,我把断路器该为半开");
this.changeStatus(HystrixStatus.HALF_OPEN);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}).start();
}
// 失败了,我们也不直接报错,而是返回备胎的结果
result = interceptor(request);
}
return result;
}
@Override
public Object test(ProceedingJoinPoint request) {
// 断路器半开,使用少许的流量测试
// 3% 100 3
int num = RDM.nextInt(100) + 1; //1 100
Object result = null;
if (num <= 3) {
// 做测试
System.out.println("test中奖,去测试提供者是否活了");
try {
result = request.proceed(request.getArgs());
// 测试通过了,关闭断路器
this.changeStatus(HystrixStatus.CLOSE);
// 开启窗口滑动来清空计数
synchronized (lock) {
System.out.println("远程调用的测试已经通过,远程服务正常");
//唤醒窗口线程
lock.notifyAll();
}
} catch (Exception e) {
// 进来,就没有通过
System.out.println("测试没有通过1");
// e.printStackTrace();
return interceptor(request);
} catch (Throwable throwable) {
// 进来,就没有通过
System.out.println("测试没有通过2");
}
} else {
System.out.println("未进测试,直接返回备胎结果");
return interceptor(request);
}
return result;
}
@Override
public void changeStatus(HystrixStatus status) {
this.status = status;
}
@Override
public HystrixStatus getStatus() {
return this.status;
}
private Object getFallCallback(Method method) {
HystixCmdRpc annotation = method.getAnnotation(HystixCmdRpc.class);
Class<?> callback = annotation.callback();
try {
return callback.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
return null;
}
}
七、使用注解来实现接口的获取 @HystixCmdRpc
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface HystixCmdRpc {
/**
* 该方法调用失败了,我做啥
*
* @return 是备胎的类型
*/
Class<?> callback() default Object.class;
}
八、测试:远程调用的接口 RpcService
远程调用的接口 RpcService
public interface RpcService {
/**
* 执行远程调用
* @return
*/
public String rpc();
}
正常调用的实现类
@Service
public class RpcServiceImpl implements RpcService {
@Autowired
private RestTemplate rest;
/**
* 正常的调用getForObject
* 不正常走RpcFallCallback 里面的rpc 方法
*/
@Override
@HystixCmdRpc(callback = RpcFallCallback.class)
public String rpc() {
String result = rest.getForObject("http://localhost:8081/info", String.class);
return result + "-------";
}
}
失败的(备胎)实现类
public class RpcFallCallback implements RpcService{
@Override
public String rpc() {
return "我是备胎";
}
}
yml
server:
port: 8087
spring:
application:
name: feign-client
eureka:
client:
service-url:
defaultZone: http://nnhx.top:8761/eureka/,http://nnhx.top:8762/eureka/,http://nnhx.top:8763/eureka/
启动类:
@SpringBootApplication
@RestController
public class FeignClientApplication {
@Autowired
private RpcService rpcService;
public static void main(String[] args) {
SpringApplication.run(FeignClientApplication.class, args);
}
@Bean
public RestTemplate rest() {
return new RestTemplate();
}
@GetMapping("/rpc/testwu")
public String sdfds() {
int i=0;
while(i<1000000000){
String rpc = rpcService.rpc();
System.out.println(rpc);
i++;
}
return "rpcOK";
}
}
启动提供者,在启动此消费者,访问http://localhost:8087/rpc/testwu查看消费者控制台--》关掉提供者,观察消费者控制台--》再重启提供者,观察消费者控制台;