前言
这篇文章是接上一篇《SpringCloud源码剖析-Hystrix初始化》,继续完成Hystrix未完成的执行流程
HystrixCommand
上回说到,Hystrix通过HystrixCommandApsect 环绕通知@Around切到@HystrixCommand注解的方法。然后会创建 HystrixCommand 去执行Hystrix,在创建HystrixCommand的时候会初始化熔断器和线程池。HystrixCommand创建好之后,如果没有配置observable模式,默认会交给CommandExecutor去执行,我们接着上一张继续看Hystrix的执行流程
HystrixCommandAspect#methodsAnnotatedWithHystrixCommand:
...省略...
try {
//目标方法是否是采用异步回调,默认false
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
}
跟下去.HystrixCommandAspect#methodsAnnotatedWithHystrixCommand
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
//ExecutionType有三种类型,同步执行,异步执行,OBSERVABLE异步回调,默认走SYNCHRONOUS
switch (executionType) {
//同步
case SYNCHRONOUS: {
//把 HystrixInvokable 转成 HystrixExecutable 后执行execute
return castToExecutable(invokable, executionType).execute();
}
//异步执行
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
//OBSERVABLE 模式
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
//转发了一下类型HystrixInvokable 转成 HystrixExecutable
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
该方法会根据传入的ExecutionType判断执行的方式,ExecutionType中有三种类型
- ASYNCHRONOUS:异步执行,
- SYNCHRONOUS:同步执行,
- OBSERVABLE:异步回调,你可去百度一下Java的Observer,它其实是观察者的实现
默认走同步执行SYNCHRONOUS,先把 HystrixInvokable 转成 HystrixExecutable 后,执行execute 继续跟下去,代码来到:com.netflix.hystrix.HystrixCommand#execute
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();
...省略...
return f;
}
execute方法中调用了queue方法,queue方法中用到了观察者模式,在toObservable方法中可以看到hystrix的降级触发流程,在toFuture方法中最终会通过GennericCommand调用CommandAction去执行我们的方法,这里我关注toObservable源码如下;
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
...省略...
//【第二步】回调
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
//【第三步】
return applyHystrixSemantics(_cmd);
}
};
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
...省略...
//这里走缓存
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//【第一步】
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
...省略...
Hystrix大量使用到了类似于JS的回调函数式编程,applyHystrixSemantics 就是一个回调,通过 Observable.defer(applyHystrixSemantics) 去执行。applyHystrixSemantics 中又调用了 applyHystrixSemantics(_cmd);跟进com.netflix.hystrix.AbstractCommand#applyHystrixSemantics方法
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
//是否允许执行熔断器,方法中判断电路是否打开,方法中会
//判断circuitBreakerForceOpen强制打开熔断器 和 circuitBreakerForceClosed强制关闭熔断器
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
//信号量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
//信号量释放的回调
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
//创建一个执行结果对象
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//执行command和监听
return executeCommandAndObserve(_cmd)
//当出现错误
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
这里确定电路未打开的情况下,获取信号量,然后调用com.netflix.hystrix.AbstractCommand#executeCommandAndObserve方法,继续跟下去:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
... 省略部分代码...
//【重要】这个是处理降级的回调,处理降级的核心流程
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
//【第二步】降级
//标记未成功
circuitBreaker.markNonSuccess();
//拿到异常对象
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
//判断错误类型拒绝执行,超时等,做不同的处理
if (e instanceof RejectedExecutionException) {
//线程池拒绝
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
//超时
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
//请求错误
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
//处理失败
return handleFailureViaFallback(e);
}
}
};
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
//【第一步】这里执行错误,触发降级回调
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
该方法中 Observable 去执行,如果出现错误会触发handleFallback降级回调,handleFallback#call方法中拿到异常后,进行处理,我们继续跟踪一下:com.netflix.hystrix.AbstractCommand#handleFailureViaFallback方法
private Observable<R> handleFailureViaFallback(Exception underlying) {
/**
* All other error handling
*/
logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);
// report failure
eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
// record the exception
executionResult = executionResult.setException(underlying);
//获取fallback或者抛出异常
return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
}
继续跟进 com.netflix.hystrix.AbstractCommand#getFallbackOrThrowException
...省略部分代码...
// acquire a permit
//拿到信号量
if (fallbackSemaphore.tryAcquire()) {
try {
if (isFallbackUserDefined()) {
executionHook.onFallbackStart(this);
//【重要】这里在获取降级的执行链
fallbackExecutionChain = getFallbackObservable();
} else {
//same logic as above without the hook invocation
fallbackExecutionChain = getFallbackObservable();
}
} catch (Throwable ex) {
//If hook or user-fallback throws, then use that as the result of the fallback lookup
fallbackExecutionChain = Observable.error(ex);
}
//执行降级
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
return handleFallbackRejectionByEmittingError();
}
} else {
return handleFallbackDisabledByEmittingError(originalException, failureType, message);
}
这里通过 getFallbackObservable()方法拿到降级执行链,getFallbackObservable方法最终调用com.netflix.hystrix.contrib.javanica.command.GenericCommand#getFallback获取降级
@Override
protected Object getFallback() {
//降级方法的CommandAction
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
//执行降级方法
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
最终通过 commandAction.executeWithArgs 执行降级方法,底层使用反射调用降级方法,然后把结果返回。
文章结束,最后想吐槽一下,Hystrix真的是我看过最难看的源码,到处用到回调编程,代码可读性是真的差,看来停更是有原因的呀。。。。。。。最后。。。喜欢的话给个好评吧。