开发者社区> 晴天哥> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Hystrix超时机制

简介: 开篇  Hystrix的超时检测本质上通过启动单独线程去检测的,线程的启动的时间刚好就是任务超时的时间,本质上就是这么个简单的逻辑。  Hystrix超时后会抛出一个HystrixTimeoutException的异常。
+关注继续查看

开篇

 Hystrix的超时检测本质上通过启动单独线程去检测的,线程的启动的时间刚好就是任务超时的时间,本质上就是这么个简单的逻辑。
 Hystrix超时后会抛出一个HystrixTimeoutException的异常。


超时检测逻辑

 Hystrix的超时包括注册过程和执行过程两个,注册过程如下:

  • 执行lift(new HystrixObservableTimeoutOperator<R>(_cmd))关联超时检测任务。

  • 在HystrixObservableTimeoutOperator类中,new TimerListener()负责创建检测任务,HystrixTimer.getInstance().addTimerListener(listener)负责关联定时任务。

  • 在HystrixObservableTimeoutOperator类中,addTimerListener通过java的定时任务服务scheduleAtFixedRate在延迟超时时间后执行



 Hystrix的超时执行过程如下:

  • 在超时后执行 listener.tick()方法后执行类TimerListener的tick方法

  • 在TimerListener类的tick方法中执行timeoutRunnable.run()后执行HystrixContextRunnable的run方法

  • 在HystrixContextRunnable类run方法中执行child.onError(new HystrixTimeoutException())实现超时。

executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));



private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }

        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);

            //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
            final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

            TimerListener listener = new TimerListener() {

                @Override
                public void tick() {
                  if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        // shut down the original request
                        s.unsubscribe();

                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(
                                originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });

                        timeoutRunnable.run();
                    }
                }

                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);

            /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
            Subscriber<R> parent = new Subscriber<R>() {

                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }

                @Override
                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }

                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, 
                                                                             TimedOutStatus.COMPLETED);
                }

            };

            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);

            return parent;
        }
    }
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        //这里直接简单粗暴的scheduleAtFixedRate以超时时间作为周期去判断是否执行完成
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, 
                listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }
public class HystrixContextRunnable implements Runnable {

    private final Callable<Void> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextRunnable(Runnable actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }
    
    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
    }

    public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, 
                                  final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
        this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }

        });
        this.parentThreadState = hystrixRequestContext;
    }

    @Override
    public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }
}


参考文章

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Hystrix-开源容错系统(下)
Hystrix-开源容错系统(下)
25 0
Hystrix 实现资源隔离的 “两把利器”
这篇文章主要跟大家讲讲如何利用 Hystrix 实现资源隔离,一起来看看~
29 0
Hystrix-开源容错系统(上)
开年第一篇文章来自于京东的小伙伴,希望这篇文章能够帮助大家对熔断和降级有所理解。
27 0
高可用系统架构(2)-Hystrix分布式系统高可用
高可用系统架构(2)-Hystrix分布式系统高可用
49 0
Hystrix 配置参数全解析
Hystrix 配置参数全解析
85 0
Hystrix介绍
开篇  对Hystrix耳闻已久,最近刚好想在项目中使用这个神器就顺带研究了一把,很多细节来不及深入研究只能把宏观上的各个概念讲解一下,这个介绍的素材大都来自github上的Hystrix官网。
1229 0
+关注
晴天哥
专注java技术,热爱长跑和阅读开源代码 邮箱 lebron374@163.com
403
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载