Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)

简介: 了解Eureka server在定时更新服务列表时依赖的细节

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

起因

  • 一个基于Spring Cloud框架的应用,如果注册到了Eureka server,那么它就会定时更新服务列表,这个定时任务启动的代码在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,如下(来自工程eureka-client,版本1.7.0):
private void initScheduledTasks() {
        //更新服务列表
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        ...
        //略去其他代码
  • 上述代码中,scheduler是ScheduledExecutorService接口的实现,其schedule方法的官方文档如下所示:

image.png

  • 上图红框显示:该方法创建的是一次性任务,但是在实际测试中,如果在CacheRefreshThread类的run方法中打个断点,就会发现该方法会被周期性调用;
  • 因此问题就来了:方法schedule(Callable callable,long delay,TimeUnit unit)创建的明明是个一次性任务,但CacheRefreshThread被周期性执行了

寻找答案

  • 打开的run方法源码,请注意下面的中文注释:
public void run() {
        Future future = null;
        try {
        //使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            //指定等待子线程的最长时间
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            //delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
            logger.error("task supervisor timed out", e);
            timeoutCounter.increment();

            long currentDelay = delay.get();
            //任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            //设置为最新的值,考虑到多线程,所以用了CAS
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            //一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.error("task supervisor rejected the task", e);
            }

            rejectedCounter.increment();
        } catch (Throwable e) {
            //一旦出现未知的异常,就停掉调度器
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.error("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
            //这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务;
            if (future != null) {
                future.cancel(true);
            }
            
        //只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务
            if (!scheduler.isShutdown()) {
                //这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时dealy的值,
                //假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound)
                //如果最近一次任务没有超时,那么就在30秒后开始新任务,
                //如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的60秒超过了最大间隔50秒)
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
  • 真相就在上面的最后一行代码中:scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS):执行完任务后,会再次调用schedule方法,在指定的时间之后执行一次相同的任务,这个间隔时间和最近一次任务是否超时有关,如果超时了就间隔时间就会变大;
  • 小结:从整体上看,TimedSupervisorTask是固定间隔的周期性任务,一旦遇到超时就会将下一个周期的间隔时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务不再超时,间隔时间又会自动恢复为初始值,另外还有CAS来控制多线程同步,简洁的代码,巧妙的设计,值得我们学习;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关文章
|
4月前
|
小程序 调度 数据库
jeecg-boot集成xxl-job调度平台,每秒/每分钟/手动都能执行成功,但是设置固定时间不触发?
jeecg-boot集成xxl-job调度平台,每秒/每分钟/手动都能执行成功,但是设置固定时间不触发?
|
11月前
|
程序员 API 数据安全/隐私保护
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
|
27天前
|
存储 容器
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
|
4月前
|
网络协议 Linux 网络安全
socket的心跳间隔和可用连接数的矛盾和平衡
socket的心跳间隔和可用连接数的矛盾和平衡
41 0
|
11月前
|
运维
查看调整cpu频率及模式
查看调整cpu频率及模式
232 2
|
数据采集 监控 Linux
一日一技:不用轮询,基于事件监控文件变动
一日一技:不用轮询,基于事件监控文件变动
112 0
如何检查视频画面延迟的时间
做视频通讯的朋友,画面延迟,就是要经常测试的事情。那么怎样测试延迟呢?
424 0
如何设置agent上报频率监控间隔时间 - WGCLOUD
在agent/config/application.properties中设置即可
如何设置agent上报频率监控间隔时间 - WGCLOUD