在实际业务场景中,我们可能会碰到需要短时间批量执行的定时任务,此时如果我们不做任何处理,只是像前面那样去执行,可能出现上一批的任务还没执行完,定时的时间就到了,那么势必会对业务产生影响,所以就需要我们对批量任务进行分片处理,本节就让我们看看Xxl-job如何做分片处理。
一、环境准备
1.mock数据
首先在数据库里建一个测试的表,并mock一些数据进去,模拟业务中的“大表”。因为本身就是在模拟,也不讲究性能什么的,大家自己操作下就行。
2.添加依赖
依赖也没什么好讲究的,大家根据自己的喜好添加,只要能确保可以正常连接上数据库就行,如下:
<!--MyBatis驱动-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!--连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
AI 代码解读
3.添加配置
别忘了配置数据库的连接信息,如下:
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=123456
AI 代码解读
4.声明实体类和查询类
根据自己实际模拟的数据声明对应的实体类和查询类,确保能查到数据就行,这步也不细说,每个人的实体和选择的ORM框架都不一样。
二、分片与不分片
1.不使用分片
首先,我们来看看这段代码:
@XxlJob("sendMsgHandler")
public void sendMsgHandler() throws Exception{
List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();
System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size());
Long startTime = System.currentTimeMillis();
userMobilePlans.forEach(item->{
try {
//模拟发送短信动作
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("任务结束时间:"+new Date());
System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}
AI 代码解读
具体的mapper如下:
@Mapper
public interface UserMobilePlanMapper {
@Select("select * from t_user_mobile_plan")
List<UserMobilePlan> selectAll();
}
AI 代码解读
通过查询用户表模拟发短信的场景,我们启动项目执行一下这个任务看看需要多久完成,如下:
可以看到,我本地模拟了两千条数据测试的结果:用时30秒左右,那么假如在实际业务场景中,这个任务需要我们每隔20秒就执行一次,这样是不是就出现问题了,所以就需要我们对批量任务进行切片处理,使得它们可以被分为好几组并行执行,缩短执行耗时。
2.使用分片
区别于上面的案例,如果采用分片广播的形式,那么这一次任务的执行将会调度集群里的所有机器(也就是执行器)进行执行,从而大大加快执行速度。因此,我们需要在原来的业务逻辑基础上加上分片参数,这个分片参数也不需要我们自己去写,通过XXL-JOB提供的工具类就可以拿到,如下:
int shardTotal = XxlJobHelper.getShardTotal();
int shardIndex = XxlJobHelper.getShardIndex();
AI 代码解读
这里的shardTotal和shardIndex分别对应分片总数和分片任务的索引,这么说可能有的同学还是不太理解。我们换种描述,shardTotal就是执行器的个数(Ps:也就是我们机器的个数),而shardIndex则可以对应到我们具体查询里的数据的id。因此,我们这里改造原查询的思路则是通过shardTotal和shardIndex对id进行求模运算,如下:
@Mapper
public interface UserMobilePlanMapper {
@Select("select * from t_user_mobile_plan where mod(id, #{shardingTotal})=#{shardingIndex}")
List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex,@Param("shardingTotal")Integer shardingTotal);
@Select("select * from t_user_mobile_plan")
List<UserMobilePlan> selectAll();
}
AI 代码解读
然后再对接口进行小小的改造,根据执行器的个数来决定是否采用分片的方式查询,如下:
@XxlJob("sendMsgHandler1")
public void sendMsgHandler1() throws Exception{
System.out.println("任务开始时间" + new Date());
int shardTotal = XxlJobHelper.getShardTotal();
int shardIndex = XxlJobHelper.getShardIndex();
List<UserMobilePlan> userMobilePlans = null;
if (shardTotal == 1) {
userMobilePlans= userMobilePlanMapper.selectAll();
} else {
userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex, shardTotal);
}
System.out.println("处理任务数量" + userMobilePlans.size());
long startTime = System.currentTimeMillis();
userMobilePlans.forEach(item -> {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("任务结束时间" + new Date());
System.out.println("任务耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
}
AI 代码解读
最后,在调度中心里修改这个任务的路由策略,如下:
3.分片测试
执行一次,观察控制台的输出,如下:
可以看到,原本两千条数据的查询被均匀地分配在两台机器上,并且同时开始执行。原本需要30秒执行的任务,现在只需要15秒就完成了,性能提升了一倍。
三、小结
SpringBoot整合XXL-JOB系列暂时完结了,各位读者如果在使用过程中遇到问题可以去查阅官方文档或者在评论区留言!