二十二.SpringCloud源码剖析-Hystrix降级

简介: 这篇文章是接上一篇《[SpringCloud源码剖析-Hystrix初始化](https://blog.csdn.net/u014494148/article/details/117221635?spm=1001.2014.3001.5501)》,继续完成Hystrix未完成的执行流程

前言

这篇文章是接上一篇《SpringCloud源码剖析-Hystrix初始化》,继续完成Hystrix未完成的执行流程

HystrixCommand

上回说到,Hystrix通过HystrixCommandApsect 环绕通知@Around切到@HystrixCommand注解的方法。然后会创建 HystrixCommand 去执行Hystrix,在创建HystrixCommand的时候会初始化熔断器和线程池。HystrixCommand创建好之后,如果没有配置observable模式,默认会交给CommandExecutor去执行,我们接着上一张继续看Hystrix的执行流程

HystrixCommandAspect#methodsAnnotatedWithHystrixCommand:

...省略...
 try {
   
         //目标方法是否是采用异步回调,默认false
        if (!metaHolder.isObservable()) {
   
              result = CommandExecutor.execute(invokable, executionType, metaHolder);
          } else {
   
              result = executeObservable(invokable, executionType, metaHolder);
          }
        }

跟下去.HystrixCommandAspect#methodsAnnotatedWithHystrixCommand

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
   
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
        //ExecutionType有三种类型,同步执行,异步执行,OBSERVABLE异步回调,默认走SYNCHRONOUS
        switch (executionType) {
   
            //同步
            case SYNCHRONOUS: {
   
                //把 HystrixInvokable  转成 HystrixExecutable 后执行execute
                return castToExecutable(invokable, executionType).execute();
            }
            //异步执行
            case ASYNCHRONOUS: {
   
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
   
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            //OBSERVABLE 模式
            case OBSERVABLE: {
   
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
    //转发了一下类型HystrixInvokable  转成 HystrixExecutable
    private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
   
        if (invokable instanceof HystrixExecutable) {
   
            return (HystrixExecutable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
    }

该方法会根据传入的ExecutionType判断执行的方式,ExecutionType中有三种类型

  • ASYNCHRONOUS:异步执行,
  • SYNCHRONOUS:同步执行,
  • OBSERVABLE:异步回调,你可去百度一下Java的Observer,它其实是观察者的实现

默认走同步执行SYNCHRONOUS,先把 HystrixInvokable 转成 HystrixExecutable 后,执行execute 继续跟下去,代码来到:com.netflix.hystrix.HystrixCommand#execute

public R execute() {
   
        try {
   
            return queue().get();
        } catch (Exception e) {
   
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

 public Future<R> queue() {
   
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();

       ...省略...
        return f;
    }

execute方法中调用了queue方法,queue方法中用到了观察者模式,在toObservable方法中可以看到hystrix的降级触发流程,在toFuture方法中最终会通过GennericCommand调用CommandAction去执行我们的方法,这里我关注toObservable源码如下;

 public Observable<R> toObservable() {
   
        final AbstractCommand<R> _cmd = this;
        ...省略...
        //【第二步】回调
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
   
            @Override
            public Observable<R> call() {
   
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
   
                    return Observable.never();
                }
                //【第三步】
                return applyHystrixSemantics(_cmd);
            }
        };

        return Observable.defer(new Func0<Observable<R>>() {
   
            @Override
            public Observable<R> call() {
   
                 ...省略...
                //这里走缓存
                /* try from cache first */
                if (requestCacheEnabled) {
   
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
   
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                //【第一步】
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);
                                ...省略...

Hystrix大量使用到了类似于JS的回调函数式编程,applyHystrixSemantics 就是一个回调,通过 Observable.defer(applyHystrixSemantics) 去执行。applyHystrixSemantics 中又调用了 applyHystrixSemantics(_cmd);跟进com.netflix.hystrix.AbstractCommand#applyHystrixSemantics方法

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
   
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);
        //是否允许执行熔断器,方法中判断电路是否打开,方法中会
        //判断circuitBreakerForceOpen强制打开熔断器 和 circuitBreakerForceClosed强制关闭熔断器

        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
   
            //信号量
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            //信号量释放的回调
            final Action0 singleSemaphoreRelease = new Action0() {
   
                @Override
                public void call() {
   
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
   
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
   
                @Override
                public void call(Throwable t) {
   
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            //尝试获取信号量
            if (executionSemaphore.tryAcquire()) {
   
                try {
   
                    /* used to track userThreadExecutionTime */
                    //创建一个执行结果对象
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    //执行command和监听
                    return executeCommandAndObserve(_cmd)
                            //当出现错误
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
   
                    return Observable.error(e);
                }
            } else {
   
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
   
            return handleShortCircuitViaFallback();
        }
    }

这里确定电路未打开的情况下,获取信号量,然后调用com.netflix.hystrix.AbstractCommand#executeCommandAndObserve方法,继续跟下去:


private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
   
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        ... 省略部分代码...
        //【重要】这个是处理降级的回调,处理降级的核心流程
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
   
            @Override
            public Observable<R> call(Throwable t) {
   
            //【第二步】降级
                //标记未成功
                circuitBreaker.markNonSuccess();
                //拿到异常对象
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                //判断错误类型拒绝执行,超时等,做不同的处理
                if (e instanceof RejectedExecutionException) {
   
                        //线程池拒绝
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
   
                    //超时
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
   
                    //请求错误
                    return handleBadRequestByEmittingError(e);
                } else {
   
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
   
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }
                    //处理失败
                    return handleFailureViaFallback(e);
                }
            }
        };

  return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                //【第一步】这里执行错误,触发降级回调
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);

该方法中 Observable 去执行,如果出现错误会触发handleFallback降级回调,handleFallback#call方法中拿到异常后,进行处理,我们继续跟踪一下:com.netflix.hystrix.AbstractCommand#handleFailureViaFallback方法

private Observable<R> handleFailureViaFallback(Exception underlying) {
   
        /**
         * All other error handling
         */
        logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);

        // report failure
        eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);

        // record the exception
        executionResult = executionResult.setException(underlying);
        //获取fallback或者抛出异常
        return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
    }

继续跟进 com.netflix.hystrix.AbstractCommand#getFallbackOrThrowException

...省略部分代码...
 // acquire a permit
                 //拿到信号量
                if (fallbackSemaphore.tryAcquire()) {
   
                    try {
   
                        if (isFallbackUserDefined()) {
   
                            executionHook.onFallbackStart(this);
                            //【重要】这里在获取降级的执行链
                            fallbackExecutionChain = getFallbackObservable();
                        } else {
   
                            //same logic as above without the hook invocation
                            fallbackExecutionChain = getFallbackObservable();
                        }
                    } catch (Throwable ex) {
   
                        //If hook or user-fallback throws, then use that as the result of the fallback lookup
                        fallbackExecutionChain = Observable.error(ex);
                    }
                    //执行降级
                    return fallbackExecutionChain
                            .doOnEach(setRequestContext)
                            .lift(new FallbackHookApplication(_cmd))
                            .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                            .doOnNext(markFallbackEmit)
                            .doOnCompleted(markFallbackCompleted)
                            .onErrorResumeNext(handleFallbackError)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } else {
   
                   return handleFallbackRejectionByEmittingError();
                }
            } else {
   
                return handleFallbackDisabledByEmittingError(originalException, failureType, message);
            }

这里通过 getFallbackObservable()方法拿到降级执行链,getFallbackObservable方法最终调用com.netflix.hystrix.contrib.javanica.command.GenericCommand#getFallback获取降级

 @Override
    protected Object getFallback() {
   
    //降级方法的CommandAction
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
   
            try {
   
                return process(new Action() {
   
                    @Override
                    Object execute() {
   
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        //执行降级方法
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
   
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
   
            return super.getFallback();
        }
    }

最终通过 commandAction.executeWithArgs 执行降级方法,底层使用反射调用降级方法,然后把结果返回。

文章结束,最后想吐槽一下,Hystrix真的是我看过最难看的源码,到处用到回调编程,代码可读性是真的差,看来停更是有原因的呀。。。。。。。最后。。。喜欢的话给个好评吧。

相关文章
|
5月前
|
Java UED 开发者
Spring Boot 降级功能的神秘面纱:Hystrix 与 Resilience4j 究竟藏着怎样的秘密?
【8月更文挑战第29天】在分布式系统中,服务稳定性至关重要。为应对故障,Spring Boot 提供了 Hystrix 和 Resilience4j 两种降级工具。Hystrix 作为 Netflix 的容错框架,通过隔离依赖、控制并发及降级机制增强系统稳定性;Resilience4j 则是一个轻量级库,提供丰富的降级策略。两者均可有效提升系统可靠性,具体选择取决于需求与场景。在面对服务故障时,合理运用这些工具能确保系统基本功能正常运作,优化用户体验。以上简介包括了两个工具的简单示例代码,帮助开发者更好地理解和应用。
115 0
|
6月前
|
缓存 监控 Java
Hystrix 源码解读
Hystrix 源码解读
42 2
|
4月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
503 37
|
4月前
|
存储 NoSQL 调度
|
4月前
|
XML 监控 Java
Spring Cloud全解析:熔断之Hystrix简介
Hystrix 是由 Netflix 开源的延迟和容错库,用于提高分布式系统的弹性。它通过断路器模式、资源隔离、服务降级及限流等机制防止服务雪崩。Hystrix 基于命令模式,通过 `HystrixCommand` 封装对外部依赖的调用逻辑。断路器能在依赖服务故障时快速返回备选响应,避免长时间等待。此外,Hystrix 还提供了监控功能,能够实时监控运行指标和配置变化。依赖管理方面,可通过 `@EnableHystrix` 启用 Hystrix 支持,并配置全局或局部的降级策略。结合 Feign 可实现客户端的服务降级。
216 23
|
4月前
|
Java 对象存储 开发者
故障隔离与容错处理:Hystrix在Spring Cloud和Netflix OSS中的应用
故障隔离与容错处理:Hystrix在Spring Cloud和Netflix OSS中的应用
67 3
|
5月前
|
人工智能 前端开发 Java
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
本文介绍了如何使用 **Spring Cloud Alibaba AI** 构建基于 Spring Boot 和 uni-app 的聊天机器人应用。主要内容包括:Spring Cloud Alibaba AI 的概念与功能,使用前的准备工作(如 JDK 17+、Spring Boot 3.0+ 及通义 API-KEY),详细实操步骤(涵盖前后端开发工具、组件选择、功能分析及关键代码示例)。最终展示了如何成功实现具备基本聊天功能的 AI 应用,帮助读者快速搭建智能聊天系统并探索更多高级功能。
1640 2
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
|
6月前
|
监控 Java 开发者
Spring Cloud中的服务熔断与降级
Spring Cloud中的服务熔断与降级
|
6月前
|
SQL Java 索引
SQL 能力问题之Hystrix的降级触发条件问题如何解决
SQL 能力问题之Hystrix的降级触发条件问题如何解决
|
7月前
|
监控 Java 微服务
Spring Cloud 之 Hystrix
Spring Cloud Hystrix 是一个用于处理分布式系统延迟和容错的库,防止雪崩效应。它作为断路器,当服务故障时通过监控短路,返回备用响应,保持系统弹性。主要功能包括服务降级和熔断: