3.7、新建 DataFlowJob 类型作业
DataFlowJob 类型的任务配置和SimpleJob
类似,操作也很简单!
创建一个DataflowJob
类型的实现类MyDataFlowJob
。
@Slf4j public class MyDataFlowJob implements DataflowJob<String> { private boolean flag = false; @Override public List<String> fetchData(ShardingContext shardingContext) { log.info("开始获取数据"); if (flag) { return null; } return Arrays.asList("qingshan", "jack", "seven"); } @Override public void processData(ShardingContext shardingContext, List<String> data) { for (String val : data) { // 处理完数据要移除掉,不然就会一直跑,处理可以在上面的方法里执行。这里采用 flag log.info("开始处理数据:" + val); } flag = true; } }
接着创建MyDataFlowJob
的配置类,将其注入到zookeeper
注册中心。
Configuration public class MyDataFlowJobConfig { /** * 任务名称 */ @Value("${dataflowJob.myDataflowJob.name}") private String jobName; /** * cron表达式 */ @Value("${dataflowJob.myDataflowJob.cron}") private String jobCron; /** * 作业分片总数 */ @Value("${dataflowJob.myDataflowJob.shardingTotalCount}") private int jobShardingTotalCount; /** * 作业分片参数 */ @Value("${dataflowJob.myDataflowJob.shardingItemParameters}") private String jobShardingItemParameters; /** * 自定义参数 */ @Value("${dataflowJob.myDataflowJob.jobParameters}") private String jobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean public MyDataFlowJob myDataFlowJob() { return new MyDataFlowJob(); } @Bean(initMethod = "init") public JobScheduler dataFlowJobScheduler(final MyDataFlowJob myDataFlowJob) { MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(myDataFlowJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定义作业核心配置 JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); // 定义DATAFLOW类型配置 DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), false); // 定义Lite作业根配置 LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).overwrite(true).build(); return dataflowJobRootConfig; } }
最后,在配置文件application.properties
中配置好对应的myDataflowJob
参数!
#dataflow类型的job dataflowJob.myDataflowJob.name=myDataflowJob dataflowJob.myDataflowJob.cron=0/15 * * * * ? dataflowJob.myDataflowJob.shardingTotalCount=1 dataflowJob.myDataflowJob.shardingItemParameters=0=a,1=b,2=c dataflowJob.myDataflowJob.jobParameters=myDataflowJobParamter
运行程序,看看效果如何?
需要注意的地方是,如果配置的是流式处理类型,它会不停的拉取数据、处理数据,在拉取的时候,如果返回为空,就不会处理数据!
如果配置的是非流式处理类型,和上面介绍的simpleJob
类型,处理一样!