elastic-job之流式作业

简介: 流式作业对应于DataflowJob接口,其定义如下: /** * 数据流分布式作业接口. * * @author zhangliang * * @param 数据类型 */ public interface DataflowJob extends ElasticJob { /** * 获取待处理数据.

流式作业对应于DataflowJob接口,其定义如下:

/**
 * 数据流分布式作业接口.
 * 
 * @author zhangliang
 * 
 * @param <T> 数据类型
 */
public interface DataflowJob<T> extends ElasticJob {
    
    /**
     * 获取待处理数据.
     *
     * @param shardingContext 分片上下文
     * @return 待处理的数据集合
     */
    List<T> fetchData(ShardingContext shardingContext);
    
    /**
     * 处理数据.
     *
     * @param shardingContext 分片上下文
     * @param data 待处理数据集合
     */
    void processData(ShardingContext shardingContext, List<T> data);
}

流式作业,每次调度触发的时候都会先调fetchData获取数据,如果获取到了数据再调度processData方法处理数据。DataflowJob在运行时有两种方式,流式的和非流式的,通过属性streamingProcess控制,如果是基于Spring XML的配置方式则是streaming-process属性,boolean类型。当作业配置为流式的时候,每次触发作业后会调度一次fetchData获取数据,如果获取到了数据会调度processData方法处理数据,处理完后又继续调fetchData获取数据,再调processData处理,如此循环,就像流水一样。直到fetchData没有获取到数据或者发生了重新分片才会停止。代码实现部分可参考数据流执行器 com.dangdang.ddframe.job.executor.type.DataflowJobExecutor。以下是DataflowJob的一个简单实现,该实现中每次调度触发时都会连续调度processData十次。

public class MyDataflowJob implements DataflowJob<String> {
	
	private static final ThreadLocal<Integer> LOOP_COUNTER
 = new ThreadLocal<>();
	private static final int LOOP_TIMES = 10;//每次获取流处理循环次数
	private static final AtomicInteger COUNTER = new AtomicInteger(1);//计数器

	@Override
	public List<String> fetchData(ShardingContext shardingContext) {
		Integer current = LOOP_COUNTER.get();
		if (current == null) {
			current = 1;
		} else {
			current += 1;
		}
		LOOP_COUNTER.set(current);
		System.out.println(Thread.currentThread() 
+ "------------current--------" + current);
		if (current > LOOP_TIMES) {
			System.out.println("\n\n\n\n");
			return null;
		} else {
			int shardingItem = shardingContext.getShardingItem();
			List<String> datas = Arrays.asList(getData(shardingItem),
 getData(shardingItem), getData(shardingItem));
			return datas;
		}
	}
	
	private String getData(int shardingItem) {
		return shardingItem + "-" + COUNTER.getAndIncrement();
	}

	@Override
	public void processData(ShardingContext shardingContext, 
List<String> data) {
		System.out.println(Thread.currentThread() + "--------" +data);
	}

}

流式作业的配置使用<job:dataflow/>配置,上面的流式作业对应的配置如下:

<job:dataflow id="myDataflowJob"
	class="com.elim.learn.elastic.job.MyDataflowJob" 
registry-center-ref="regCenter"
	cron="0 0/2 * * * ?" sharding-total-count="2"
	sharding-item-parameters="0=广州,1=深圳" failover="true" overwrite="true"
	streaming-process="true">

上述配置参数的含义跟上一篇介绍的简单作业的配置是一样的,新增的streaming-process表示是否启用流式作业。

(本文由Elim写于2017年10月1日)

目录
相关文章
|
3月前
|
Java 调度 Maven
Elastic-job分布式调度系统
Elastic-job分布式调度系统
|
3月前
|
Shell Docker 容器
docker部署xxl_job
docker部署xxl_job
21 1
|
10月前
|
SQL Java 调度
大师级教程: 零基础掌握xxl-job分布式任务调度 Job Scheduling
大师级教程: 零基础掌握xxl-job分布式任务调度 Job Scheduling
254 0
大师级教程: 零基础掌握xxl-job分布式任务调度 Job Scheduling
|
12月前
|
JavaScript Java 关系型数据库
xxl-job搭建
xxl-job搭建
251 0
|
运维
Elastic Job进阶--作业是如何被立即触发的
Elastic Job进阶--作业是如何被立即触发的
|
存储 算法 安全
定时任务之elastic-job概述
定时任务之elastic-job概述
373 0
|
SQL Java 关系型数据库
elastic-job 定时任务集成
elastic-job 定时任务集成
418 0
elastic-job 定时任务集成
|
运维 算法 Java
Elastic-Job源码解读
文章以任务初始化、任务触发、分片策略、分布式为切入点讲述Elastic Job的源码,一方面自己总结记录、另一方面希望可以帮助到其他的开发者快读理解Elastic Job工作原理。
1207 0
|
运维 安全 Java
Elastic-Job使用及原理
Elastic-Job使用及原理
|
Java 调度 Spring
elastic-job之简单job
简介 elastic-job是当当网开源的基于zookeeper和quartz实现的分布式作业调度框架。github地址是https://github.com/dangdangdotcom/elastic-job,官方网站是http://elasticjob.io/。
2428 0