源码分析ElasticJob事件监听器

简介: 源码分析ElasticJob事件监听器

在任务执行的前后,ElasticJob可以提供扩展,其主要类图如下:

a1e675631160b899af4c98f97dc00933.jpg

ElastciJobListener任务执行事件监听器提供如下两个方法:


  • void beforeJobExecuted(final Sha-rdingContexts shardingContexts)
    在任务执行之前调用,shardingC-ontexts为分片上下文信息。
  • void afterJobExecuted(final Shard-ingContexts shardingContexts)
    在任务执行之后调用,shardingC-ontexts为分片上下文信息。


上述回调函数是分片级的,也就是说默认情况下,同一个任务的多个分片都会执行beforeJobExecuted、afterJobExe-cuted方法,如果某些情况同一个任务只需在最后一个分片执行之前执行,最后一个分片执行完成后才执行,又该如何实现呢。AbstractDistributeOnceElastic-JobListener粉墨登场。


AbstractDistributeOnceElasticJobListener分布式作业中只执行一次的监听器。


  • long startedTimeoutMilliseconds
    分片等待beforeJobExecuted方法执行的超时时间,单位为毫秒。
  • Object startedWait
    分片等待beforeJobExecuted的监视器。
  • completedTimeoutMilliseconds
    分片等待afterJobExecuted方法执行的超时时间,单位为毫秒。
  • Object completedWait
    分片等待afterJobExecuted的监视器。
  • GuaranteeService guaranteeSrv
    保证分布式任务全部开始和结束状态的服务。
  • TimeService timeService
    时间服务器,主要用来获取当前服务器的系统时间。
  • void beforeJobExecuted(Sharding-Contexts sc)
    分片任务执行之前调用,该方法是一个模板方法,最后一个分片成功启动后调用。
  • doBeforeJobExecutedAtLastStarted(ShardingContexts sc)
    该方法为抽象方法,由具体子类实现,如果有其他分片未执行完成,该方法会阻塞等待,或最后启动的分片执行完doBeforeJobExecuted-AtLastStarted方法。
  • void afterJobExecuted(final Shardi-ngContexts shardingContexts)
    分片任务执行之后调用,该方法是一个模板方法,实现当最后一个分片成功执行完成后调用doAfterJob-ExecutedAtLastCompleted方法,该方法为抽象方法,由具体子类实现,如果有其他分片未执行完成,该方法会阻塞等待,或最后启动的分片执行完doAfterJobExecutedAt-LastCompleted方法。
  • public abstract void doBeforeJobE-xecutedAtLastStarted(ShardingContexts shardingContexts)
    分布式环境中最后一个作业分片执行前的执行的方法。
  • public abstract void doAfterJobExe-cutedAtLastCompleted(ShardingContexts sc)
    分布式环境中最后一个作业分片执行完成后的执行方法。
  • void notifyWaitingTaskStart()
    通知分片节点上的任务开始之前并唤醒由于还有其他分片未启动造成自身等待阻塞的任务。
  • void notifyWaitingTaskComplete()
    通知分片节点任务执行完成并唤醒由于存在其他分片任务未执行完成时阻塞的任务。


接下来重点分析AbstractDistributeOnce-ElasticJobListener实现原理(分布式环境中,监听器只在一个节点上执行的实现逻辑)。


重点分析beforeJobExecuted方法实现原理,afterJobExecuted方法类似。

1AbstractDistributeOnceElasticJobListener#beforeJobExecuted
 2public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
 3        guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());    // @1
 4        if (guaranteeService.isAllStarted()) {                                                                                         // @2
 5            doBeforeJobExecutedAtLastStarted(shardingContexts);
 6            guaranteeService.clearAllStartedInfo();
 7            return;
 8        }
 9        long before = timeService.getCurrentMillis();                                                                           // @3
10        try {
11            synchronized (startedWait) {
12                startedWait.wait(startedTimeoutMilliseconds);
13            }
14        } catch (final InterruptedException ex) {
15            Thread.interrupted();
16        }
17        if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {                         // @4
18            guaranteeService.clearAllStartedInfo();
19            handleTimeout(startedTimeoutMilliseconds);
20        }
21    }

代码@1:使用GuaranteeService注册分片开始。

代码@2:判断该任务所有的分片是否都已经注册启动,如果都注册启动,则调用doBeforeJobExecutedAtLastStarted()方法。

代码@3:获取服务器当前时间。

代码@4:利用startWait.wait(startedTi-meoutMilliseconds)带超时时间的等待,这里如何唤醒呢?

代码@5:判断唤醒是超时唤醒还是正常唤醒,如果是超时唤醒,清除所有的分片注册启动信息,处理超时异常。


上述流程简单明了,下面有两个问题需要进一步探究,如何注册分片启动信息与如何被唤醒。


1、任务节点注册分配给当前节点的任务分片

 1/**
 2     * 根据分片项注册任务开始运行.
 3     * 
 4     * @param shardingItems 待注册的分片项
 5     */
 6    public void registerStart(final Collection<Integer> shardingItems) {
 7        for (int each : shardingItems) {
 8            jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each));
 9        }
10    }

创建持久节点:{namespace}/jobname/guarantee/started/{item}。


2、当最后一个节点注册启动执行doBe-foreJobExecutedAtLastStarted方法后,如何唤醒其他节点以便进入到任务执行阶段?


请注意:AbstractDistributeOnceElasti-cJobListener#beforeJobExecuted在执行完doBeforeJobExecuted方法后,会执行guaranteeService.clearAllStarted-Info()方法。

1if (guaranteeService.isAllStarted()) {
2        doBeforeJobExecutedAtLastStarted(shardingContexts);
3        guaranteeService.clearAllStartedInfo();
4        return;
5 }

也就是回调函数执行完毕后,会删除任务所有的分片。温馨提示:业务实现子类实现doBeforeJobExecutedAtLastSt-arted方法时最好不要抛出异常,不然各节点的唤醒操作只能是等待超时后被唤醒。


1GuaranteeService#clearAllStartedInfo
2/**
3     * 清理所有任务启动信息.
4     */
5    public void clearAllStartedInfo() {
6        jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT);
7    }


直接删除{namespace}/jobname/guara-ntee/started根节点。基于ZK的开发模式触发一次删除操作,肯定会有事件监听器来监听该节点的删除事件,从而触发其他节点的唤醒操作,果不奇然Elastci-Job提供GuaranteeListenerManager事件监听来监听{namespace}/jobname/g-uarantee/started节点的删除事件。


1GuaranteeListenerManager$StartedNodeRemovedJobListener 
 2class StartedNodeRemovedJobListener extends AbstractJobListener {
 3        @Override
 4        protected void dataChanged(final String path, final Type eventType, final String data) {
 5            if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) {
 6                for (ElasticJobListener each : elasticJobListeners) {
 7                    if (each instanceof AbstractDistributeOnceElasticJobListener) {
 8                        ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
 9                    }
10                }
11            }
12        }
13    }


每个Job实例在监听到{namespace}/job-name/guarantee/started节点被删除后,会执行AbstractDistributeOnceElasticJo-bListener的notifyWaitingTaskStart方法唤醒被阻塞的线程,使线程进入到任务执行阶段。


同理,任务执行后监听方法afterJobEx-ecuted的执行流程实现原理一样,在这里就不在重复讲解了。

相关文章
|
9月前
|
Java Spring 容器
深入理解Spring源码之IOC 扩展原理BeanFactoryPostProcessor和事件监听ApplicationListener
深入理解Spring源码之IOC 扩展原理BeanFactoryPostProcessor和事件监听ApplicationListener
|
5月前
|
Java Spring 容器
Spring中事件监听(通知)机制详解与实践
Spring中事件监听(通知)机制详解与实践
128 0
|
8月前
|
消息中间件 Java Spring
Spring事件监听机制使用和原理解析
今天来分享一下Spring的事件监听机制,之前分享过一篇Spring监听机制的使用,今天从原理上进行解析,Spring的监听机制基于观察者模式,就是就是我们所说的发布订阅模式,这种模式可以在一定程度上实现代码的解耦,如果想要实现系统层面的解耦,那么消息队列就是我们的不二选择,消息队列本身也是发布订阅模式,只是不同的消息队列的实现方式不一样。
64 0
|
10月前
|
设计模式 Java 数据库连接
Spring高手之路7——事件机制与监听器的全面探索
本篇文章为你详细解析了Spring的事件机制,包括了Spring事件模型的四个核心概念:事件源、事件、广播器、监听器。我们通过深入浅出的实例解析了如何自定义事件和监听器,以及如何在实际项目中应用。最后,我们还详细探讨了监听器和Bean的生命周期的关系。无论你是Spring初学者,还是有一定经验的开发者,阅读本文都将帮助你更深入地理解Spring的事件机制和监听器,掌握Spring框架的核心技术。
374 0
Spring高手之路7——事件机制与监听器的全面探索
|
10月前
|
设计模式 消息中间件 Java
SpringBoot事件监听机制及观察者/发布订阅模式详解
介绍观察者模式和发布订阅模式的区别。 SpringBoot快速入门事件监听。 什么是观察者模式? 观察者模式是经典行为型设计模式之一。 在GoF的《设计模式》中,观察者模式的定义:在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。如果你觉得比较抽象,接下来这个例子应该会让你有所感觉:
|
11月前
|
缓存 监控 Java
spring事件监听器应用场景
spring事件监听器应用场景
|
12月前
|
Java Spring 容器
Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析
Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析
104 0
|
12月前
|
缓存 Java Go
Spring5源码 - 13 Spring事件监听机制_@EventListener源码解析
Spring5源码 - 13 Spring事件监听机制_@EventListener源码解析
99 0
|
开发框架 Java Spring
深入理解Spring的事件通知机制
Spring作为一个优秀的企业级应用开发框架,不仅提供了众多的功能模块和工具,还提供了一种灵活高效的事件通知机制,用于处理组件之间的松耦合通讯。本文将详细介绍Spring的事件通知机制的原理、使用方法以及示例,希望对大家深入理解Spring框架有所帮助。
202 0
|
前端开发 Java Spring
Spring - 事件监听机制 源码解析
众所周知,Spring Framework在 BeanFactory的基础容器之上扩展为了ApplicationContext上下文。 ApplicationContext处理包含了BeanFactory的全部基础功能之外,还额外提供了大量的扩展功能。
100 0
Spring - 事件监听机制 源码解析