3 检查是否开启缓存
如果这个command开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果
否则,继续往后
return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { /* 这是一个有状态的对象,因此只能使用一次 */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); // TODO 为此创建新的错误类型 throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // 记录此命令执行,无论发生什么情况 if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /* 首先尝试从缓存 */ if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 放入缓存 if (requestCacheEnabled && cacheKey != null) { // 包装以缓存 HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // 另一个线程击败了我们,因此使用缓存值 toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // 我们刚刚创建了一个ObservableCommand,所以我们进行了强制转换并返回了它 afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // 进行一次清理(在正常终端状态(此行)或退订(下一行)) .doOnUnsubscribe(unsubscribeCommandCleanup) // 进行一次清理 .doOnCompleted(fireOnCompletedHook); } });
上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(必须是同一个request context里面的两个command才能用到缓存)
4 检查是否开启短路器
检查这个command对应的依赖服务是否开启短路器
如果断路器被打开了,那么hystrix就不会执行这个command,而是直接执行fallback降级
5 检查线程池/队列/semaphore是否已满
如果command对应的线程池/队列/semaphore已满,那么也不会执行command,而是直接去调用fallback降级机制,同时发送 reject 信息给断路器统计
6 执行command
调用HystrixObservableCommand.construct()或HystrixCommand.run()来实际执行这个command
HystrixCommand.run()
返回一个单条结果,或者抛出一个异常
HystrixObservableCommand.construct()
返回一个Observable对象,可以获取多条结果
如果HystrixCommand.run()或HystrixObservableCommand.construct()的执行,超过了timeout时长的话,那么command所在的线程就会抛出一个TimeoutException
如果timeout了,也会去执行fallback降级机制,而且就不会管run()或construct()返回的值
我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个TimeoutException,但是还是可能会因为严重延迟的调用线程占满整个线程池的
即使这个时候新来的流量都被限流了。。。
如果没有timeout的话,那么就会拿到一些调用依赖服务获取到的结果,然后hystrix会做一些logging记录和metric统计
7 短路健康检查
Hystrix会将每一个依赖服务的调用成功,失败,拒绝,超时,等事件,都会发送给circuit breaker断路器
短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计
短路器会根据这些统计次数来决定,是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路,然后如果在之后第一次检查发现调用成功了,就关闭断路器