SpringCloud - Hystrix的执行流程(上)

简介: SpringCloud - Hystrix的执行流程(上)

0 Hystrix执行原理图

11.png

1 创建HystrixCommand/HystrixObservableCommand

一个HystrixCommand或HystrixObservableCommand对象,代表对某个依赖服务发起的一次请求或者调用

构造的时候,可在构造器中传入任何需要的参数。


HystrixCommand仅返回一个结果的调用。

HystrixObservableCommand可能会返回多条结果的调用。

12.png

直接继承HystrixCommand并实现run方法即可。

public class GetUserAccountCommand extends HystrixCommand<UserAccount> {
  ...
    @Override
    protected UserAccount run() {
        /* 模拟执行网络调用以检索用户信息 */
        try {
            Thread.sleep((int) (Math.random() * 10) + 2);
        } catch (InterruptedException e) {
        }
        /* 5%的时间失败来说明fallback的工作原理 */
        if (Math.random() > 0.95) {
            throw new RuntimeException("random failure processing UserAccount network response");
        }
        /* 延迟会增加5%的时间,因此有时会触发超时 */
        if (Math.random() > 0.95) {
            // 随机等待时间尖峰
            try {
                Thread.sleep((int) (Math.random() * 300) + 25);
            } catch (InterruptedException e) {
            }
        }
        /* 成功...使用远程服务响应的数据创建UserAccount */
        return new UserAccount(86975, "John James", 2, true, false, true);
    }
  ...
}

2 调用command的执行方法

执行Command就可以发起一次对依赖服务的调用

要执行Command,需要在4个方法中选择其中的一个

  • 前两种是HystrixCommand独有的哦

13.png

2.1 execute()

    /**
     * 用于同步执行 command.
     * 
     * @return R
     *         如果command由于任何原因失败,则执行 #run 或从 #getFallback() fallback的结果.
     *
     * @throws HystrixRuntimeException
     *             如果发生故障并且无法检索fallback
     * @throws HystrixBadRequestException
     *             如果使用了无效的参数或状态来表示用户故障,而不是系统故障
     *
     * @throws IllegalStateException
     *             如果多次调用
     */
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

调用后直接阻塞,直到依赖服务返回单条结果,或抛异常

2.2 queue()

    /**
     * 用于异步执行命令 command.
     *
     * 这将使该command在线程池上排队,并在完成后返回一个 Future 以获取结果.
     * 注意:如果配置为不在单独的线程中运行,则其效果与 #execute() 相同,并会阻塞.
     * 不会抛出异常,而只是切换为同步执行,因此无需更改代码即可 将command从运行在单独的线程切换到调用线程.
     * (switch a command from running on a separate thread to the calling thread.)
     * 
     * @return {@code Future <R>}执行 #run() 的结果,或者如果command由于任何原因失败,则返回 #getFallback() 的结果.
     * @throws HystrixRuntimeException
     *             如果不存在fallback
     *             如果通过 ExecutionException#getCause() 中的{@code Future.get(), 如果不存在失败发生的话
     *             或者如果无法将命令排队(如,短路,线程池/信号被拒绝),则立即返回
     * @throws HystrixBadRequestException
     *         通过 ExecutionException#getCause() 中的 Future.get() 如果使用了无效的参数或状态来表示用户故障而不是系统故障
     * @throws IllegalStateException
     *             如果多次调用
     */
    public Future<R> queue() {
        /*
         * 当Future.cancel(boolean)的“ mayInterrupt”标志设为true时
         * 由Observable.toBlocking().toFuture() 返回的Future不实现执行线程的中断
         * 因此,为了遵守Future的约定,我们必须围绕它.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        final Future<R> f = new Future<R>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }
                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                     * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                     * than that interruption request cannot be taken back.
                     * 这里唯一有效的转换是false -> true.
                     * 如果存在由该命令创建(这很奇怪,但从未禁止过)的两个futures,例如f1和f2,
                     * 并且对f1.cancel(true)和f2.cancel(false)的调用是由不同的线程发起,
                     * 尚不清楚在检查mayInterruptOnCancel时将使用什么值.
                     * 处理这种情况的最一致的方法是说,如果在中断的情况下调用了任何cancellation,则无法撤回该中断请求.
                     */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
            }
                final boolean res = delegate.cancel(interruptOnFutureCancel.get());
                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }
                return res;
      }
            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
      }
            @Override
            public boolean isDone() {
                return delegate.isDone();
      }
            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }
            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
        };
        /* 对立即抛出的错误状态的特殊处理 */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
          case COMMAND_EXCEPTION:
          case TIMEOUT:
                        // 不会仅从 queue().get() 中将这些类型从 queue() 中抛出, 因为它们是执行错误
            return f;
          default:
            // these are errors we throw from queue() as they as rejection type errors
                        // 这些是从 queue() 抛出的错误,因为它们是拒绝类型错误
            throw hre;
          }
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }
        return f;
    }

调用,返回一个Future,后面可以通过Future获取单条结果


2.3 observe()

订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的Observable对象的拷贝对象


toObservable()

返回一个Observable对象,如果我们订阅这个对象,就会执行command并且获取返回结果

其中execute()和queue()仅对HystrixCommand适用

K             value   = command.execute();
Future<K>     fValue  = command.queue();
Observable<K> ohValue = command.observe();         
Observable<K> ocValue = command.toObservable();    

execute()实际上会调用queue().get()

14.png

在 queue() 方法中,会调用toObservable().toBlocking().toFuture()

image.png

即,无论是哪种执行command的方式,最终都是依赖toObservable()

也就是说同步的HystrixCommand最终都会依赖Observable,尽管HystrixCommand是用来发射单个事件的

目录
相关文章
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
15873 126
|
10月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
161 6
|
10月前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
178 5
|
10月前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
194 5
|
XML 监控 Java
Spring Cloud全解析:熔断之Hystrix简介
Hystrix 是由 Netflix 开源的延迟和容错库,用于提高分布式系统的弹性。它通过断路器模式、资源隔离、服务降级及限流等机制防止服务雪崩。Hystrix 基于命令模式,通过 `HystrixCommand` 封装对外部依赖的调用逻辑。断路器能在依赖服务故障时快速返回备选响应,避免长时间等待。此外,Hystrix 还提供了监控功能,能够实时监控运行指标和配置变化。依赖管理方面,可通过 `@EnableHystrix` 启用 Hystrix 支持,并配置全局或局部的降级策略。结合 Feign 可实现客户端的服务降级。
722 23
|
Java 对象存储 开发者
故障隔离与容错处理:Hystrix在Spring Cloud和Netflix OSS中的应用
故障隔离与容错处理:Hystrix在Spring Cloud和Netflix OSS中的应用
207 3
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
1854 15
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
361 3
|
消息中间件 Java 开发者
Spring Cloud微服务框架:构建高可用、分布式系统的现代架构
Spring Cloud是一个开源的微服务框架,旨在帮助开发者快速构建在分布式系统环境中运行的服务。它提供了一系列工具,用于在分布式系统中配置、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态等领域的支持。
423 5

热门文章

最新文章