Dataflow类型调度
Dataflow类型的定时任务需要实现Dataflowjob接口,该接口提供2个方法供覆盖,分别用于抓取(fetchData)和处理( processData)数据,我们继续对例子进行改造。
Dataflow类型用于处理数据流,他和SimpleJob不同,它以数据流的方式执行,调用fetchData抓取数据,知道抓取不到数据才停止作业。
定时任务开始的时候,先抓取数据,判断数据是否为空,若不为空则进行处理数据
代码示例
第一步:创建任务类
@Component public class FileDataflowJob implements DataflowJob<FileCustom> { @Autowired private FileCustomMapper fileCustomMapper; //抓取数据 @Override public List<FileCustom> fetchData(ShardingContext shardingContext) { System.out.println("开始抓取数据......"); List<FileCustom> fileCustoms = fileCustomMapper.selectLimit(2); return fileCustoms; } //处理数据 @Override public void processData(ShardingContext shardingContext, List<FileCustom> data) { for(FileCustom custom:data){ backUp(custom); } } private void backUp(FileCustom custom){ System.out.println("备份的方法名:"+custom.getName()+"备份的类型:"+custom.getType()); System.out.println("======================="); //模拟进行备份操作 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } fileCustomMapper.changeState(custom.getId(),1); } }
第二步:创建任务配置类
@Configuration public class JobConfig { @Bean public static CoordinatorRegistryCenter registryCenter(@Value("${zookeeper.url}") String url, @Value("${zookeeper.groupName}") String groupName) { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(url, groupName); //设置节点超时时间 zookeeperConfiguration.setSessionTimeoutMilliseconds(100); //zookeeperConfiguration("zookeeper地址","项目名") CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); regCenter.init(); return regCenter; } //功能的方法 private static LiteJobConfiguration createJobConfiguration(Class clazz, String corn, int shardingCount,String shardingParam,boolean isDateFlowJob) { JobCoreConfiguration.Builder jobBuilder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), corn, shardingCount); if(!StringUtils.isEmpty(shardingParam)){ jobBuilder.shardingItemParameters(shardingParam); } //定义作业核心配置newBuilder("任务名称","corn表达式","分片数量") JobCoreConfiguration simpleCoreConfig = jobBuilder.build(); // 定义SIMPLE类型配置 cn.wolfcode.MyElasticJob JobTypeConfiguration jobConfiguration; if(isDateFlowJob){ jobConfiguration = new DataflowJobConfiguration(simpleCoreConfig,clazz.getCanonicalName(),true); }else{ jobConfiguration = new SimpleJobConfiguration(simpleCoreConfig,clazz.getCanonicalName()); } //定义Lite作业根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfiguration).overwrite(true).build(); return simpleJobRootConfig; } @Bean(initMethod = "init") public SpringJobScheduler fileDatFlowaScheduler(FileDataflowJob job, CoordinatorRegistryCenter registryCenter){ LiteJobConfiguration jobConfiguration = createJobConfiguration(job.getClass(),"0/10 * * * * ?",1,null,true); return new SpringJobScheduler(job,registryCenter,jobConfiguration); } }
第三步:创建Mapper映射文件
@Mapper public interface FileCustomMapper { @Update("update t_file_custom set backedUp = #{state} where id = #{id}") int changeState(@Param("id") Long id, @Param("state")int state); @Select("select * from t_file_custom where backedUp = 0 limit #{count}") List<FileCustom> selectLimit(int count); }
运维管理
事件追踪
Elastic-Job-Lite在配置中提供了JobEventConfiguration,支持数据库方式配置,会在数据库中自动创建JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引来近路作业的相关信息。
修改Elastic-job配置类
第一步:在ElasticJobConfig配置类中注入DataSource
第二步:在任务配置中增加事件追踪配置
运行结果
该表记录每次作业的执行历史,分为两个步骤:
1.作业开始执行时间想数据库插入数据
2.作业完成执行时向数据库更新数据,更新is_success,complete_time和failure_cause(如果任务执行失败)
该表记录作业状态变更痕迹表,可通过每次作业运行的task_id查询作业状态变化的生命轨迹和运行轨迹
运维平台
搭建步骤
1.解压缩
2.进入bin目录,并执行
bin\start.bat
3.打开浏览器访问http://localhost:8899
用户名:root 密码:root
使用步骤
第一步:注册中心配置
第二步:事件追踪数据源配置
之后就可以使用了