elastic-job之流式作业

简介: 流式作业对应于DataflowJob接口,其定义如下: /** * 数据流分布式作业接口. * * @author zhangliang * * @param 数据类型 */ public interface DataflowJob extends ElasticJob { /** * 获取待处理数据.

流式作业对应于DataflowJob接口,其定义如下:

/**
 * 数据流分布式作业接口.
 * 
 * @author zhangliang
 * 
 * @param <T> 数据类型
 */
public interface DataflowJob<T> extends ElasticJob {
    
    /**
     * 获取待处理数据.
     *
     * @param shardingContext 分片上下文
     * @return 待处理的数据集合
     */
    List<T> fetchData(ShardingContext shardingContext);
    
    /**
     * 处理数据.
     *
     * @param shardingContext 分片上下文
     * @param data 待处理数据集合
     */
    void processData(ShardingContext shardingContext, List<T> data);
}

流式作业,每次调度触发的时候都会先调fetchData获取数据,如果获取到了数据再调度processData方法处理数据。DataflowJob在运行时有两种方式,流式的和非流式的,通过属性streamingProcess控制,如果是基于Spring XML的配置方式则是streaming-process属性,boolean类型。当作业配置为流式的时候,每次触发作业后会调度一次fetchData获取数据,如果获取到了数据会调度processData方法处理数据,处理完后又继续调fetchData获取数据,再调processData处理,如此循环,就像流水一样。直到fetchData没有获取到数据或者发生了重新分片才会停止。代码实现部分可参考数据流执行器 com.dangdang.ddframe.job.executor.type.DataflowJobExecutor。以下是DataflowJob的一个简单实现,该实现中每次调度触发时都会连续调度processData十次。

public class MyDataflowJob implements DataflowJob<String> {
	
	private static final ThreadLocal<Integer> LOOP_COUNTER
 = new ThreadLocal<>();
	private static final int LOOP_TIMES = 10;//每次获取流处理循环次数
	private static final AtomicInteger COUNTER = new AtomicInteger(1);//计数器

	@Override
	public List<String> fetchData(ShardingContext shardingContext) {
		Integer current = LOOP_COUNTER.get();
		if (current == null) {
			current = 1;
		} else {
			current += 1;
		}
		LOOP_COUNTER.set(current);
		System.out.println(Thread.currentThread() 
+ "------------current--------" + current);
		if (current > LOOP_TIMES) {
			System.out.println("\n\n\n\n");
			return null;
		} else {
			int shardingItem = shardingContext.getShardingItem();
			List<String> datas = Arrays.asList(getData(shardingItem),
 getData(shardingItem), getData(shardingItem));
			return datas;
		}
	}
	
	private String getData(int shardingItem) {
		return shardingItem + "-" + COUNTER.getAndIncrement();
	}

	@Override
	public void processData(ShardingContext shardingContext, 
List<String> data) {
		System.out.println(Thread.currentThread() + "--------" +data);
	}

}

流式作业的配置使用<job:dataflow/>配置,上面的流式作业对应的配置如下:

<job:dataflow id="myDataflowJob"
	class="com.elim.learn.elastic.job.MyDataflowJob" 
registry-center-ref="regCenter"
	cron="0 0/2 * * * ?" sharding-total-count="2"
	sharding-item-parameters="0=广州,1=深圳" failover="true" overwrite="true"
	streaming-process="true">

上述配置参数的含义跟上一篇介绍的简单作业的配置是一样的,新增的streaming-process表示是否启用流式作业。

(本文由Elim写于2017年10月1日)

目录
相关文章
|
分布式计算 并行计算 数据库
Schedulerx2.0分布式计算原理&最佳实践
1. 前言 Schedulerx2.0的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。
26949 2
|
存储 算法 NoSQL
还分不清 Cookie、Session、Token、JWT?看这一篇就够了
Cookie、Session、Token 和 JWT(JSON Web Token)都是用于在网络应用中进行身份验证和状态管理的机制。虽然它们有一些相似之处,但在实际应用中有着不同的作用和特点,接下来就让我们一起看看吧,本文转载至http://juejin.im/post/5e055d9ef265da33997a42cc
48630 13
|
算法 数据可视化 Linux
Linux内核编译:深入理解`make menuconfig`命令
Linux内核编译:深入理解`make menuconfig`命令
890 0
|
Java 测试技术 开发者
必学!Spring Boot 单元测试、Mock 与 TestContainer 的高效使用技巧
【10月更文挑战第18天】 在现代软件开发中,单元测试是保证代码质量的重要手段。Spring Boot提供了强大的测试支持,使得编写和运行测试变得更加简单和高效。本文将深入探讨Spring Boot的单元测试、Mock技术以及TestContainer的高效使用技巧,帮助开发者提升测试效率和代码质量。
1185 2
|
存储 网络协议 Linux
把Linux服务器做成一个下载器,实现远程下载
把Linux服务器做成一个下载器,实现远程下载
把Linux服务器做成一个下载器,实现远程下载
|
存储 监控 供应链
账单系统-架构设计思路(对外版)
阿里商旅背景阿里商旅作为飞猪旅行旗下面向企业客户的数字化差旅解决方案产品,依托飞猪旅行机票、酒店供应链,为企业客户提供一站式的机票、酒店、火车票、用车等预订管控及结算票据服务。阿里商旅不仅是集团欢行的供应商,而且近几年在商业化差旅市场上崭露头角,服务了2万+中大型客户,43万+小微企业。FY22财年商旅技术团队重点规划在酒店供应链、预订管控服务、B+C客户服务、渠道及商旅基础建设等核心方向进行建设
5329 2
账单系统-架构设计思路(对外版)
|
Java BI API
spring boot 整合 itextpdf 导出 PDF,写入大文本,写入HTML代码,分析当下导出PDF的几个工具
这篇文章介绍了如何在Spring Boot项目中整合iTextPDF库来导出PDF文件,包括写入大文本和HTML代码,并分析了几种常用的Java PDF导出工具。
3297 0
spring boot 整合 itextpdf 导出 PDF,写入大文本,写入HTML代码,分析当下导出PDF的几个工具
|
存储 Java Apache
Spring Boot整合OpenOffice实现Word、Excel、PPT在线预览
Spring Boot整合OpenOffice实现Word、Excel、PPT在线预览
1002 0
|
NoSQL Java 数据库连接
springboot整合Redis中连接池jedis与lettuce的对比和实现
springboot整合Redis中连接池jedis与lettuce的对比和实现
2630 0