elastic-job之监听器

简介: 每个作业都可以配置一个任务监听器,确切的说是只能配置一个本地监听器和一个分布式监听器。Elastic-job有三种作业类型,但是它们的通用配置都是一样的,所以本文在介绍作业的监听器配置时将仅以简单作业的配置为例。

每个作业都可以配置一个任务监听器,确切的说是只能配置一个本地监听器和一个分布式监听器。Elastic-job有三种作业类型,但是它们的通用配置都是一样的,所以本文在介绍作业的监听器配置时将仅以简单作业的配置为例。

本地监听器

本地监听器只在节点执行自己分片的时候调度,每个分片任务调度的时候本地监听器都会执行。本地监听器由ElasticJobListener接口定义,其定义如下:

/**
 * 弹性化分布式作业监听器接口.
 * 
 * @author zhangliang
 */
public interface ElasticJobListener {
    
    /**
     * 作业执行前的执行的方法.
     * 
     * @param shardingContexts 分片上下文
     */
    void beforeJobExecuted(final ShardingContexts shardingContexts);
    
    /**
     * 作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    void afterJobExecuted(final ShardingContexts shardingContexts);
}

该接口的接口方法的注释上已经说明了对应的接口方法的调用时机,详情也可以参考com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute()方法。简单示例如下:

public class MyElasticJobListener implements ElasticJobListener {

	private static final Logger LOGGER = Logger.getLogger(MyElasticJobListener.class);
	
	@Override
	public void beforeJobExecuted(ShardingContexts shardingContexts) {
		LOGGER.info(String.format("开始调度任务[%s]", shardingContexts.getJobName()));
	}

	@Override
	public void afterJobExecuted(ShardingContexts shardingContexts) {
		LOGGER.info(String.format("任务[%s]调度完成", shardingContexts.getJobName()));
	}

}

本地监听器的配置由<job:listener/>节点配置,如下示例中就通过<job:listener/>给简单作业myElasticJob定义了一个本地监听器。

<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"
	registry-center-ref="regCenter" cron="0/30 * * * * ?"
	sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
	failover="true" overwrite="true" >
	<job:listener class="com.elim.learn.elastic.job.listener.MyElasticJobListener" />
</job:simple>

分布式监听器

本地监听器在作业执行本地的分片任务时会执行,如上面的示例,我们的作业被分成了6片,则监听器任务会执行6次。而分布式监听器会在总的任务开始执行时执行一次,在总的任务结束执行时执行一次。分布式监听器也是在普通监听器的基础上实现的,由AbstractDistributeOnceElasticJobListener抽象类封装的,其实现了ElasticJobListener接口。要实现自己的监听器只需要继承AbstractDistributeOnceElasticJobListener抽象类,实现其中的抽象方法即可。AbstractDistributeOnceElasticJobListener抽象类的定义如下:

/**
 * 在分布式作业中只执行一次的监听器.
 * 
 * @author zhangliang
 */
public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {
    
    private final long startedTimeoutMilliseconds;
    
    private final Object startedWait = new Object();
    
    private final long completedTimeoutMilliseconds;
    
    private final Object completedWait = new Object();
    
    @Setter
    private GuaranteeService guaranteeService;
    
    private TimeService timeService = new TimeService();
    
    public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
        if (startedTimeoutMilliseconds <= 0L) {
            this.startedTimeoutMilliseconds = Long.MAX_VALUE;
        } else {
            this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;
        }
        if (completedTimeoutMilliseconds <= 0L) {
            this.completedTimeoutMilliseconds = Long.MAX_VALUE; 
        } else {
            this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;
        }
    }
    
    @Override
    public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
        guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
        if (guaranteeService.isAllStarted()) {
            doBeforeJobExecutedAtLastStarted(shardingContexts);
            guaranteeService.clearAllStartedInfo();
            return;
        }
        long before = timeService.getCurrentMillis();
        try {
            synchronized (startedWait) {
                startedWait.wait(startedTimeoutMilliseconds);
            }
        } catch (final InterruptedException ex) {
            Thread.interrupted();
        }
        if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
            guaranteeService.clearAllStartedInfo();
            handleTimeout(startedTimeoutMilliseconds);
        }
    }
    
    @Override
    public final void afterJobExecuted(final ShardingContexts shardingContexts) {
        guaranteeService.registerComplete(shardingContexts.getShardingItemParameters().keySet());
        if (guaranteeService.isAllCompleted()) {
            doAfterJobExecutedAtLastCompleted(shardingContexts);
            guaranteeService.clearAllCompletedInfo();
            return;
        }
        long before = timeService.getCurrentMillis();
        try {
            synchronized (completedWait) {
                completedWait.wait(completedTimeoutMilliseconds);
            }
        } catch (final InterruptedException ex) {
            Thread.interrupted();
        }
        if (timeService.getCurrentMillis() - before >= completedTimeoutMilliseconds) {
            guaranteeService.clearAllCompletedInfo();
            handleTimeout(completedTimeoutMilliseconds);
        }
    }
    
    private void handleTimeout(final long timeoutMilliseconds) {
        throw new JobSystemException("Job timeout. timeout mills is %s.", timeoutMilliseconds);
    }
    
    /**
     * 分布式环境中最后一个作业执行前的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts);
    
    /**
     * 分布式环境中最后一个作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts);
    
    /**
     * 通知任务开始.
     */
    public void notifyWaitingTaskStart() {
        synchronized (startedWait) {
            startedWait.notifyAll();
        }
    }
    
    /**
     * 通知任务结束.
     */
    public void notifyWaitingTaskComplete() {
        synchronized (completedWait) {
            completedWait.notifyAll();
        }
    }
}

以下是一个使用分布式监听器的示例:

public class MyDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {

	private static final Logger logger = Logger.getLogger(MyDistributeOnceElasticJobListener.class);
	
	/**
	 * @param startedTimeoutMilliseconds
	 * @param completedTimeoutMilliseconds
	 */
	public MyDistributeOnceElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
		super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
	}

	@Override
	public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
		logger.info("分布式监听器开始……");
	}

	@Override
	public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
		logger.info("分布式监听器结束……");
	}

}

分布式监听器用到了锁的等待和通知,startedTimeoutMilliseconds和completedTimeoutMilliseconds分别用来指定作业开始前和完成后的对应的锁等待最大超时时间。分布式监听器由<job:distributed-listener/>,以下是一个使用分布式监听器的示例:

<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"
	registry-center-ref="regCenter" cron="0/30 * * * * ?"
	sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
	failover="true" overwrite="true" >
	<job:distributed-listener class="com.elim.learn.elastic.job.listener.MyDistributeOnceElasticJobListener" 
			started-timeout-milliseconds="100" completed-timeout-milliseconds="100"/>
</job:simple>

(本文由Elim写于2017年10月2日)

目录
相关文章
|
Arthas 监控 Java
|
JSON 数据格式
Redisson官方文档 - 4. 数据序列化
Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了多种的对象编码供大家选择。
10912 0
|
7月前
|
缓存 安全 网络协议
免费在线IP地址查询工具
在日常的网络使用中,我们经常需要了解IP地址的信息,例如想要确认某个网站的IP地址、追踪某个网络攻击的来源等。此时,我们可以使用一些在线IP地址查询工具。
822 33
|
NoSQL 安全 调度
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
Redisson 的 MultiLock 是一种分布式锁实现,支持对多个独立的 RLock 同时加锁或解锁。它通过“整锁整放”机制确保所有锁要么全部加锁成功,要么完全回滚,避免状态不一致。适用于跨多个 Redis 实例或节点的场景,如分布式任务调度。其核心逻辑基于遍历加锁列表,失败时自动释放已获取的锁,保证原子性。解锁时亦逐一操作,降低死锁风险。MultiLock 不依赖 Lua 脚本,而是封装多锁协调,满足高一致性需求的业务场景。
265 0
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
|
Java Maven Spring
Spring Boot中集成ZooKeeper的最佳实践
Spring Boot中集成ZooKeeper的最佳实践
|
前端开发 小程序 Java
【规范】SpringBoot接口返回结果及异常统一处理,这样封装才优雅
本文详细介绍了如何在SpringBoot项目中统一处理接口返回结果及全局异常。首先,通过封装`ResponseResult`类,实现了接口返回结果的规范化,包括状态码、状态信息、返回信息和数据等字段,提供了多种成功和失败的返回方法。其次,利用`@RestControllerAdvice`和`@ExceptionHandler`注解配置全局异常处理,捕获并友好地处理各种异常信息。
6824 1
【规范】SpringBoot接口返回结果及异常统一处理,这样封装才优雅
|
Java Spring 容器
Java获取接口的所有实现类方法
这篇文章介绍了在Java中获取接口所有实现类的方法,包括使用JDK的ServiceLoader(SPI机制)和Spring Boot中的@Autowired自动注入及ApplicationContextAware接口两种方式。
1345 1
|
存储 弹性计算 固态存储
阿里云服务器4核32G配置多少钱?我们应该如何选择?
阿里云服务器4核32G配置有多达十几种实例规格可选,不同实例规格的收费标准不一样,本文介绍了4核32G配置可选实例规格和最新收费标准及活动价格,可供大家了解阿里云服务器4核32G配置多少钱以及选择建议。
阿里云服务器4核32G配置多少钱?我们应该如何选择?
|
缓存 负载均衡 Java
Nacos 集群部署时性能优化配置
Nacos 集群部署时性能优化配置
619 2
|
安全 Java 数据库连接
Spring Boot 优雅关机时异步线程安全优化
Spring Boot 优雅关机时异步线程安全优化
483 1