分片
分片的概念
当只有一台机器的情况下,给定时任务分片四个,在机器A启动四个线程,分别处理四个分片的内容
当有两台机器的情况下,分片由两个机器进行分配,机器A负责索引为0,1分片内容,机器B负责2,3分片内容
当有三台机器的时候,情况如图所示
当有四台机器的时候
当有五台机器的时候
当分片消耗资源少的时候,第一种情况和第二种情况没有太大区别,反之,如果消耗资源很大的时候,CPU的利用率效率会降低
分片数建议服务器个数倍数
分片案例环境搭建
案例需求
数据库中有一些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已经备份了
第一步:添加依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.2.0</version> </dependency> <!--mysql驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency>
第二步:添加配置
spring: datasource: url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezone=GMT%2B8 driverClassName: com.mysql.cj.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource username: root password: 2022
第三步:添加实体类
@Data public class FileCustom { //唯⼀标识 private Long id; //⽂件名 private String name; //⽂件类型 private String type; //⽂件内容 private String content; //是否已备份 private Boolean backedUp = false; public FileCustom(){} public FileCustom(Long id, String name, String type, String content){ this.id = id; this.name = name; this.type = type; this.content = content; } }
第四步:添加任务类
@Autowired private FileCustomMapper fileCustomMapper; @Override public void execute(ShardingContext shardingContext) { doWork(); } private void doWork() { //查询出所有的备份任务 List<FileCustom> fileCustoms = fileCustomMapper.selectAll(); for (FileCustom custom:fileCustoms){ backUp(custom); } } private void backUp(FileCustom custom){ System.out.println("备份的方法名:"+custom.getName()+"备份的类型:"+custom.getType()); System.out.println("======================="); //模拟进行备份操作 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } fileCustomMapper.changeState(custom.getId(),1); } }
第五步: 添加任务调度配置
@Bean(initMethod = "init") public SpringJobScheduler fileScheduler(FileCustomElasticjob job, CoordinatorRegistryCenter registryCenter){ LiteJobConfiguration jobConfiguration = createJobConfiguration(job.getClass(),"0/5 * * * * ?",1); return new SpringJobScheduler(job,registryCenter,jobConfiguration); }
案例改造成任务分片
第一步:修改任务配置类
@Configuration public class JobConfig { @Bean public static CoordinatorRegistryCenter registryCenter(@Value("${zookeeper.url}") String url, @Value("${zookeeper.groupName}") String groupName) { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(url, groupName); //设置节点超时时间 zookeeperConfiguration.setSessionTimeoutMilliseconds(100); //zookeeperConfiguration("zookeeper地址","项目名") CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); regCenter.init(); return regCenter; } //功能的方法 private static LiteJobConfiguration createJobConfiguration(Class clazz, String corn, int shardingCount,String shardingParam) { JobCoreConfiguration.Builder jobBuilder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), corn, shardingCount); if(!StringUtils.isEmpty(shardingParam)){ jobBuilder.shardingItemParameters(shardingParam); } //定义作业核心配置newBuilder("任务名称","corn表达式","分片数量") JobCoreConfiguration simpleCoreConfig = jobBuilder.build(); // 定义SIMPLE类型配置 cn.wolfcode.MyElasticJob System.out.println("MyElasticJob.class.getCanonicalName---->"+ MyElasticJob.class.getCanonicalName()); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,clazz.getCanonicalName()); //定义Lite作业根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; } @Bean(initMethod = "init") public SpringJobScheduler fileScheduler(FileCustomElasticjob job, CoordinatorRegistryCenter registryCenter){ LiteJobConfiguration jobConfiguration = createJobConfiguration(job.getClass(),"0/10 * * * * ?",4,"0=text,1=image,2=radio,3=vedio"); return new SpringJobScheduler(job,registryCenter,jobConfiguration); } }
第二步:修改任务类
@Component @Slf4j public class FileCustomElasticjob implements SimpleJob { @Autowired private FileCustomMapper fileCustomMapper; @Override public void execute(ShardingContext shardingContext) { doWork(shardingContext.getShardingParameter()); log.info("线程ID:{},任务的名称:{},任务的参数:{},分片个数:{},分片索引号:{},分片参数:{}", Thread.currentThread().getId(), shardingContext.getJobName(), shardingContext.getJobParameter(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter()); } private void doWork(String shardingParameter) { //查询出所有的备份任务 List<FileCustom> fileCustoms = fileCustomMapper.selectByType(shardingParameter); for (FileCustom custom:fileCustoms){ backUp(custom); } } private void backUp(FileCustom custom){ System.out.println("备份的方法名:"+custom.getName()+"备份的类型:"+custom.getType()); System.out.println("======================="); //模拟进行备份操作 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } fileCustomMapper.changeState(custom.getId(),1); } }
第三步:修改Mapper映射文件
@Mapper public interface FileCustomMapper { @Select("select * from t_file_custom where backedUp = 0") List<FileCustom> selectAll(); @Update("update t_file_custom set backedUp = #{state} where id = #{id}") int changeState(@Param("id") Long id, @Param("state")int state); @Select("select * from t_file_custom where backedUp = 0 and type = #{type}") List<FileCustom> selectByType(String shardingParameter); }