3.3、创建工程
本文采用springboot
来搭建工程为例,创建工程并添加elastic-job-lite
依赖!
<!-- 引入elastic-job-lite核心模块 --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <!-- 使用springframework自定义命名空间时引入 --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
在配置文件application.properties
中提前配置好 zookeeper 注册中心相关信息!
#zookeeper config zookeeper.serverList=127.0.0.1:2181 zookeeper.namespace=example-elastic-job-test
3.4、新建 ZookeeperConfig 配置类
@Configuration @ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0") public class ZookeeperConfig { /** * zookeeper 配置 * @return */ @Bean(initMethod = "init") public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${zookeeper.serverList}") String serverList, @Value("${zookeeper.namespace}") String namespace){ return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace)); } }
3.5、新建任务处理类
elastic-job
支持三种类型的作业任务处理!
- Simple 类型作业:Simple 类型用于一般任务的处理,只需实现
SimpleJob
接口。该接口仅提供单一方法用于覆盖,此方法将定时执行,与Quartz原生接口相似。 - Dataflow 类型作业:Dataflow 类型用于处理数据流,需实现
DataflowJob
接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData
)和处理(processData
)数据。 - Script类型作业:Script 类型作业意为脚本类型作业,支持 shell,python,perl等所有类型脚本。只需通过控制台或代码配置 scriptCommandLine 即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
3.6、新建 Simple 类型作业
编写一个SimpleJob
接口的实现类MySimpleJob
,当前工作主要是打印一条日志。
@Slf4j public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info(String.format("Thread ID: %s, 作业分片总数: %s, " + "当前分片项: %s.当前参数: %s," + "作业名称: %s.作业自定义参数: %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
创建一个MyElasticJobListener
任务监听器,用于监听MySimpleJob
的任务执行情况。
@Slf4j public class MyElasticJobListener implements ElasticJobListener { private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime); } }
创建一个MySimpleJobConfig
类,将MySimpleJob
其注入到zookeeper
。
@Configuration public class MySimpleJobConfig { /** * 任务名称 */ @Value("${simpleJob.mySimpleJob.name}") private String mySimpleJobName; /** * cron表达式 */ @Value("${simpleJob.mySimpleJob.cron}") private String mySimpleJobCron; /** * 作业分片总数 */ @Value("${simpleJob.mySimpleJob.shardingTotalCount}") private int mySimpleJobShardingTotalCount; /** * 作业分片参数 */ @Value("${simpleJob.mySimpleJob.shardingItemParameters}") private String mySimpleJobShardingItemParameters; /** * 自定义参数 */ @Value("${simpleJob.mySimpleJob.jobParameters}") private String mySimpleJobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean public MySimpleJob mySimpleJob() { return new MySimpleJob(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { //配置任务监听器 MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定义作业核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); // 定义SIMPLE类型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); // 定义Lite作业根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; } }
在配置文件application.properties
中配置好对应的mySimpleJob
参数!
#elastic job #simpleJob类型的job simpleJob.mySimpleJob.name=mySimpleJob simpleJob.mySimpleJob.cron=0/15 * * * * ? simpleJob.mySimpleJob.shardingTotalCount=3 simpleJob.mySimpleJob.shardingItemParameters=0=a,1=b,2=c simpleJob.mySimpleJob.jobParameters=helloWorld
运行程序,看看效果如何?
在上图demo
中,配置的分片数为3,这个时候会有3个线程进行同时执行任务,因为都是在一台机器上执行的,这个任务被执行来3次,下面修改一下端口配置,创建三个相同的服务实例,在看看效果如下:
很清晰的看到任务被执行一次!