源码分析ElasticJob任务运行事件监听器

简介: 在任务执行的前后,ElasticJob可以提供扩展,其主要类图如下:ElastciJobListener:elasticJob任务执行事件监听器,提供如下两个方法:void beforeJobExecuted(final ShardingContexts shardingContexts); 在任务执行之前调用,shardingContexts为分片上下文信息。

在任务执行的前后,ElasticJob可以提供扩展,其主要类图如下:
这里写图片描述
ElastciJobListener:elasticJob任务执行事件监听器,提供如下两个方法:

  • void beforeJobExecuted(final ShardingContexts shardingContexts);

     在任务执行之前调用,shardingContexts为分片上下文信息。
  • void afterJobExecuted(final ShardingContexts shardingContexts)

    在任务执行之后调用,shardingContexts为分片上下文信息。
    

上述回调函数是分片级的,也就是说默认情况下,同一个任务的多个分片都会执行beforeJobExecuted、afterJobExecuted方法,如果某些情况同一个任务只需在最后一个分片执行之前执行,最后一个分片执行完成后才执行,又该如何实现呢。AbstractDistributeOnceElasticJobListener粉墨登场。

AbstractDistributeOnceElasticJobListener:在分布式作业中只执行一次的监听器。

  • private final long startedTimeoutMilliseconds:分片等待beforeJobExecuted方法执行的超时时间,单位为毫秒。
  • private final Object startedWait = new Object():分片等待beforeJobExecuted的监视器。
  • private final long completedTimeoutMilliseconds:分片等待afterJobExecuted方法执行的超时时间,单位为毫秒。
  • private final Object completedWait = new Object():分片等待afterJobExecuted的监视器。
  • private GuaranteeService guaranteeService:保证分布式任务全部开始和结束状态的服务。
  • private TimeService timeService = new TimeService():时间服务器,主要用来获取当前服务器的系统时间。
  • public final void beforeJobExecuted(final ShardingContexts shardingContexts):分片任务执行之前调用,该方法是一个模板方法,最后一个分片成功启动后调用doBeforeJobExecutedAtLastStarted方法,该方法为抽象方法,由具体子类实现,如果有其他分片未执行完成,该方法会阻塞等待,或最后启动的分片执行完doBeforeJobExecutedAtLastStarted方法。
  • public final void afterJobExecuted(final ShardingContexts shardingContexts):分片任务执行之后调用,该方法是一个模板方法,实现当最后一个分片成功执行完成后调用doAfterJobExecutedAtLastCompleted方法,该方法为抽象方法,由具体子类实现,如果有其他分片未执行完成,该方法会阻塞等待,或最后启动的分片执行完doAfterJobExecutedAtLastCompleted方法。
  • public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts):分布式环境中最后一个作业分片执行前的执行的方法。
  • public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts):分布式环境中最后一个作业分片执行完成后的执行方法。
  • public void notifyWaitingTaskStart():通知分片节点上的任务开始之前(唤醒由于还有其他分片未启动造成自身等待阻塞)。
  • public void notifyWaitingTaskComplete():通知分片节点任务执行完成(唤醒由于存在其他分片任务未执行完成时阻塞)。

接下来重点分析AbstractDistributeOnceElasticJobListener实现原理(分布式环境中,监听器只在一个节点上执行的实现逻辑)

重点分析beforeJobExecuted方法实现原理,afterJobExecuted方法类似。

AbstractDistributeOnceElasticJobListener#beforeJobExecuted

public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
        guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());    // @1
        if (guaranteeService.isAllStarted()) {                                                                                         // @2
            doBeforeJobExecutedAtLastStarted(shardingContexts);
            guaranteeService.clearAllStartedInfo();
            return;
        }
        long before = timeService.getCurrentMillis();                                                                           // @3
        try {
            synchronized (startedWait) {
                startedWait.wait(startedTimeoutMilliseconds);
            }
        } catch (final InterruptedException ex) {
            Thread.interrupted();
        }
        if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {                         // @4
            guaranteeService.clearAllStartedInfo();
            handleTimeout(startedTimeoutMilliseconds);
        }
    }

代码@1:使用GuaranteeService注册分片开始。

代码@2:判断该任务所有的分片是否都已经注册启动,如果都注册启动,则调用doBeforeJobExecutedAtLastStarted()方法。

代码@3:获取服务器当前时间。
代码@4:利用startWait.wait(startedTimeoutMilliseconds)带超时时间的等待,这里如何唤醒呢?
代码@5:判断唤醒是超时唤醒还是正常唤醒,如果是超时唤醒,清除所有的分片注册启动信息,处理超时异常。

上述流程简单明了,上面有两个问题需要进一步探究,如何注册分片启动信息与如何被唤醒。

1、任务节点注册分配给当前节点的任务分片

/**
     * 根据分片项注册任务开始运行.
     * 
     * @param shardingItems 待注册的分片项
     */
    public void registerStart(final Collection<Integer> shardingItems) {
        for (int each : shardingItems) {
            jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each));
        }
    }

创建持久节点:${namespace}/jobname/guarantee/started/{item}。

2、当最后一个节点注册启动执行doBeforeJobExecutedAtLastStarted方法后,如果唤醒其他节点以便进入到任务执行阶段

if (guaranteeService.isAllStarted()) {
        doBeforeJobExecutedAtLastStarted(shardingContexts);
        guaranteeService.clearAllStartedInfo();
        return;
 }

也就是回调函数执行完毕后,会删除任务所有的分片。温馨提示:业务实现子类实现doBeforeJobExecutedAtLastStarted方法时最好不要抛出异常,不然各节点的唤醒操作只能是等待超时后被唤醒。

GuaranteeService#clearAllStartedInfo

/**
     * 清理所有任务启动信息.
     */
    public void clearAllStartedInfo() {
        jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT);
    }

直接删除${namespace}/jobname/guarantee/started根节点。基于ZK的开发模式,触发一次删除操作,肯定会有事件监听器来监听该节点的删除事件,从而触发其他节点的唤醒操作,果不奇然,ElastciJob提供GuaranteeListenerManager事件监听来监听${namespace}/jobname/guarantee/started节点的删除事件。

GuaranteeListenerManager$StartedNodeRemovedJobListener

class StartedNodeRemovedJobListener extends AbstractJobListener {
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) {
                for (ElasticJobListener each : elasticJobListeners) {
                    if (each instanceof AbstractDistributeOnceElasticJobListener) {
                        ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
                    }
                }
            }
        }
    }

每个Job实例在监听到${namespace}/jobname/guarantee/started节点被删除后,会执行AbstractDistributeOnceElasticJobListener的notifyWaitingTaskStart方法唤醒被阻塞的线程,是线程进入到任务执行阶段。

同理,任务执行后监听方法afterJobExecuted的执行流程实现原理一样,在这里就不在重复讲解了。


原文发布时间为:2018-12-15
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈

目录
相关文章
深入探究Camunda监听器
执行监听器与任务监听器
1405 1
深入探究Camunda监听器
flowable 启动流程的三种方式
flowable 启动流程的三种方式
322 0
|
2月前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
Java Spring
SpringBoot核心特性——异步任务和定时任务那些事
前言 通常情况下,SpringMVC接收到请求后会将请求具体分发给单个线程进行处理。如果请求处理中涉及到比较耗时的操作,为了能更快地将响应返回给用户,那么就需要将耗时的业务操作交由别的线程进行异步处理,而SpringBoot已经为我们提供了这样的实现。
574 2
SpringBoot核心特性——异步任务和定时任务那些事
|
Java Spring 容器
Spring中事件监听(通知)机制详解与实践
Spring中事件监听(通知)机制详解与实践
247 0
|
消息中间件 Java Spring
Spring事件监听机制使用和原理解析
今天来分享一下Spring的事件监听机制,之前分享过一篇Spring监听机制的使用,今天从原理上进行解析,Spring的监听机制基于观察者模式,就是就是我们所说的发布订阅模式,这种模式可以在一定程度上实现代码的解耦,如果想要实现系统层面的解耦,那么消息队列就是我们的不二选择,消息队列本身也是发布订阅模式,只是不同的消息队列的实现方式不一样。
109 0
|
Java Spring 容器
Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析
Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析
126 0
|
前端开发 Java Spring
Spring - 事件监听机制 源码解析
众所周知,Spring Framework在 BeanFactory的基础容器之上扩展为了ApplicationContext上下文。 ApplicationContext处理包含了BeanFactory的全部基础功能之外,还额外提供了大量的扩展功能。
126 0
Spring - 事件监听机制 源码解析
|
Java Spring 容器
springboot实战原理(10)--配置事件监听的4种方式和原理
springboot实战原理(10)--配置事件监听的4种方式和原理
545 0
springboot实战原理(10)--配置事件监听的4种方式和原理