【Hystrix技术指南】(3)超时机制的原理和实现

简介: 【Hystrix技术指南】(3)超时机制的原理和实现

[温馨提示]


承接上一篇文章🏹「Hystrix」(2)参数配置的详细介绍



[背景介绍]


  • 分布式系统的规模和复杂度不断增加,随着而来的是对分布式系统可用性的要求越来越高。在各种高可用设计模式中,【熔断、隔离、降级、限流】是经常被使用的。而相关的技术,Hystrix本身早已算不上什么新技术,但它却是最经典的技术体系!。
  • Hystrix以实现熔断降级的设计,从而提高了系统的可用性。
  • Hystrix是一个在调用端上,实现断路器模式,以及隔舱模式,通过避免级联故障,提高系统容错能力,从而实现高可用设计的一个Java服务组件库。
  • Hystrix实现了资源隔离机制



前提介绍


Hystrix的超时检测本质上通过启动单独线程去检测的,线程的执行的时间刚好就是任务超时的时间,本质上就是这么个简单的逻辑。


Hystrix超时后会抛出一个HystrixTimeoutException的异常。




超时检测逻辑


Hystrix的超时包括注册过程和执行过程两个,注册过程如下:


  • 执行lift(new HystrixObservableTimeoutOperator(_cmd))关联超时检测任务


  • 在HystrixObservableTimeoutOperator类中,new TimerListener()负责创建检测任务,HystrixTimer.getInstance().addTimerListener(listener)负责关联定时任务


  • 在HystrixObservableTimeoutOperator类中,addTimerListener通过java的定时任务服务scheduleAtFixedRate在延迟超时时间后执行



Hystrix的超时执行过程如下:


  1. 在超时后执行listener.tick()方法后执行类TimerListener的tick方法
  2. 在TimerListener类的tick方法中执行timeoutRunnable.run()后执行HystrixContextRunnable的run方法
  3. 在HystrixContextRunnable类run方法中执行child.onError(new HystrixTimeoutException())实现超时
  4. executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));



private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
        final AbstractCommand<R> originalCommand;
        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }
        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);
            //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
            final HystrixRequestContext hystrixRequestContext = 
              HystrixRequestContext.getContextForCurrentThread();
            TimerListener listener = new TimerListener() {
                @Override
                public void tick() {
                  if(originalCommand.isCommandTimedOut
            .compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, 
              originalCommand.commandKey);
                        // shut down the original request
                        s.unsubscribe();
                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(
                                originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });
                        timeoutRunnable.run();
                    }
                }
                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };
            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);
            /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
            Subscriber<R> parent = new Subscriber<R>() {
                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }
                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }
                @Override
                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }
                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
        originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, 
                                                                             TimedOutStatus.COMPLETED);
                }
            };
            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);
            return parent;
        }
    }
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };
        //这里直接简单粗暴的scheduleAtFixedRate以超时时间作为周期去判断是否执行完成
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, 
                listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), 
        TimeUnit.MILLISECONDS);
          return new TimerReference(listener, f);
    }
  public class HystrixContextRunnable implements Runnable {
      private final Callable<Void> actual;
      private final HystrixRequestContext parentThreadState;
      public HystrixContextRunnable(Runnable actual) {
          this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
      }
      public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable 
      actual) {
          this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
      }
      public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, 
                                  final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
          this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }
        });
        this.parentThreadState = hystrixRequestContext;
    }
    @Override
    public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }
}




相关文章
|
10月前
|
监控 微服务
Hystrix断路器执行原理
Hystrix断路器执行原理
63 0
|
11月前
Ribbon、Feign、Hystrix超时&重试&熔断问题
在使用Ribbon、Feign、Hystrix组合时,因为配置的问题出现以下现象,让我的大脑CPU烧的不行不行(拿我老家话说就是“脑子ran滴奥”)
123 0
|
10月前
|
安全 Java
Hystrix超时机制为服务接口调用超时提供安全保护
Hystrix超时机制为服务接口调用超时提供安全保护
93 1
|
10月前
|
缓存 Java 应用服务中间件
Hystrix中RequestCache请求缓存技术
Hystrix中RequestCache请求缓存技术
86 0
|
10月前
|
缓存 Java 数据中心
Hystrix执行时内部原理
Hystrix执行时内部原理
40 0
|
10月前
|
缓存 Java 应用服务中间件
Hystrix线程池技术实现资源隔离
Hystrix线程池技术实现资源隔离
26 0
|
应用服务中间件
高并发下hystrix熔断超时及concurrent.RejectedExecutionException: Rejected command because thread-pool queueSiz...
高并发下hystrix熔断超时及concurrent.RejectedExecutionException: Rejected command because thread-pool queueSiz...
251 0
|
设计模式 缓存 监控
【Hystrix技术指南】(7)故障切换的运作流程原理分析(含源码)
【Hystrix技术指南】(7)故障切换的运作流程原理分析(含源码)
236 0
【Hystrix技术指南】(7)故障切换的运作流程原理分析(含源码)
|
设计模式 缓存 安全
【Hystrix技术指南】(6)请求合并机制原理分析
【Hystrix技术指南】(6)请求合并机制原理分析
166 0
【Hystrix技术指南】(6)请求合并机制原理分析
|
数据采集 缓存 监控
【Hystrix技术指南】(5)Command创建和执行实现
【Hystrix技术指南】(5)Command创建和执行实现
170 0
【Hystrix技术指南】(5)Command创建和执行实现