分布式任务是各大公司的必不可少的组件,他的特性和组件要求如下
功能列表弹性调度支持任务在分布式场景下的分片和高可用能够水平扩展任务的吞吐量和执行效率任务处理能力随资源配备弹性伸缩资源分配在适合的时间将适合的资源分配给任务并使其生效相同任务聚合至相同的执行器统一处理动态调配追加资源至新分配的任务作业治理失效转移错过作业重新执行自诊断修复作业依赖(TODO)基于有向无环图(DAG)的作业间依赖基于有向无环图(DAG)的作业分片间依赖作业开放生态可扩展的作业类型统一接口丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等易于对接业务作业,能够与 Spring 依赖注入无缝整合可视化管控端作业管控端作业执行历史数据追踪注册中心管理环境要求Java请使用 Java 8 及其以上版本。Maven请使用 Maven 3.5.0 及其以上版本。ZooKeeper请使用 ZooKeeper 3.6.0 及其以上版本。Mesos(仅 ElasticJob-Cloud 使用)请使用 Mesos 1.1.0 及其兼容版本。1首先添加pom
<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-boot-starter</artifactId> <version>${revision}</version> </dependency> <dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-core</artifactId> <version>${revision}</version> </dependency> <dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-simple-executor</artifactId> <version>${revision}</version> </dependency> <dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-namespace</artifactId> <version>${revision}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>${curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.1.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.1.0</version> </dependency>
yaml配置
elasticjob:
tracing:
type: RDB
regCenter:
serverLists: localhost:2181
namespace: elasticjob-lite-springboot
jobs:
simpleJob:
elasticJobClass: org.microservices.pay.job.SpringBootSimpleJob
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
dataflowJob:
elasticJobClass: org.microservices.pay.job.SpringBootDataflowJob
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
simplejob
import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.microservices.pay.entity.Foo; import org.microservices.pay.repository.FooRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @Component public class SpringBootSimpleJob implements SimpleJob { private final Logger logger = LoggerFactory.getLogger(SpringBootSimpleJob.class); @Autowired private FooRepository fooRepository; @Override public void execute(final ShardingContext shardingContext) { logger.info("Item: {} | Time: {} | Thread: {} | {}", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"); List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10); for (Foo each : data) { fooRepository.setCompleted(each.getId()); } } }
DataflowJob
@Component public class SpringBootDataflowJob implements DataflowJob<Foo> { private final Logger logger = LoggerFactory.getLogger(SpringBootDataflowJob.class); @Resource private FooRepository fooRepository; @Override public List<Foo> fetchData(final ShardingContext shardingContext) { logger.info("Item: {} | Time: {} | Thread: {} | {}", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH"); return fooRepository.findTodoData(shardingContext.getShardingParameter(), 10); } @Override public void processData(final ShardingContext shardingContext, final List<Foo> data) { logger.info("Item: {} | Time: {} | Thread: {} | {}", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS"); for (Foo each : data) { fooRepository.setCompleted(each.getId()); } } }
启动主类 就可以看到任务的执行信息了