业务场景
现有一个数据拼装入库的接口,总数据量大约几万条,之前使用单线程同步处理,需要处理几分钟,这接口速度在生产上是不允许的,针对这一问题,需要对此接口进行速度优化。
优化方案
使用多线程异步处理的方式,技能保证接口很快的响应,也能提高数据的拼装入库操作。
多线程的实现--线程池
为什么要使用线程池
1.在Java中,如果每个请求到达就创建一个新线程,开销是相当大的。在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。
2.除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个jvm里创建太多的线程,可能会是系统由于过度消耗内存或“切换过度”而导致系统资源不足。
3.为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁的线程的次数,特别是一个些资源耗费比较大但是线程的创建和销毁尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。
线程池的创建
ThreadPoolExecutor是线程池的核心实现类,在JDK1.5引入,位于java.util.concurrent包
Executor
线程池相关顶级接口,它将任务的提交与任务的执行分离开来ExecutorService
继承并扩展了Executor接口,提供了Runnable、FutureTask等主要线程实现接口扩展ThreadPoolExecutor
是线程池的核心实现类,用来执行被提交的任务
1、Spring配置类
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean("myExceutor")
public Executor myfoExceutor(ThreadPoolConfigProperties pool) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(pool.getCoreSize());
// 设置最大线程数
executor.setMaxPoolSize(pool.getMaxSize());
//配置队列大小
executor.setQueueCapacity(pool.getQueueSize());
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(pool.getKeepAliveTime());
// 设置默认线程名称
executor.setThreadNamePrefix("pageInfoExceutor-");
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
executor.initialize();
return executor;
}
}
2、手动创建
/**
corePoolSize:核心线程池的线程数量
maximumPoolSize:最大的线程池线程数量
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。
unit:线程活动保持时间的单位。
workQueue:指定任务队列所使用的阻塞队列
threadFactory:线程工厂提供线程的创建方式,默认使用Executors.defaultThreadFactory()
handler:当线程池所处理的任务数超过其承载容量或关闭后继续有任务提交时,所调用的拒绝策略
**/
ExecutorService exec = new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
Executors是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool
方法可以生成一个拥有固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
其本质是初始化一个ThreadPoolExecutor对象。
提交任务
1、execute()
通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。
/**
* 源码
**/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
分 3 个步骤进行:
1. 如果正在运行的线程少于 corePoolSize,请尝试使用给定命令作为其第一个任务来启动新线程。对 addWorker 的调用以原子方式检查 runState 和 workerCount,因此通过返回 false 来防止错误警报,这些警报会在不应该添加线程时添加线程。
2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有线程自上次检查以来就死了)或者池在进入此方法后关闭了。因此,我们重新检查状态,如有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。
3.如果我们不能排队任务,那么我们尝试添加一个新线程。如果失败了,我们知道我们已经关闭或饱和,因此拒绝这项任务。
*/
int c = ctl.get();
//判断工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//执行addworker,创建一个核心线程,创建失败重新获取ctl
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果工作线程数大于核心线程数,判断线程池的状态是否为running,并且可以添加进队列
//如果线程池不是running状态,则执行拒绝策略,(还是会调用一次addworker)
if (isRunning(c) && workQueue.offer(command)) {
//再次获取ctl,进行双重检索
int recheck = ctl.get();
//如果线程池是不是处于RUNNING的状态,那么就会将任务从队列中移除,
//如果移除失败,则会判断工作线程是否为0 ,如果过为0 就创建一个非核心线程
//如果移除成功,就执行拒绝策略,因为线程池已经不可用了;
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//线程池挂了或者大于最大线程数
else if (!addWorker(command, false))
reject(command);
}
exec.execute(() -> gooodsService.copyData(a,b));
2、submit()
通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。
/**
提交可运行任务以执行,并返回表示该任务的未来。未来的方法get将在成功完成后返回null。
参数:任务 – 要提交的任务
返回:a 未来代表任务完成前
抛出:RejectedExecutionException – 如果无法安排任务执行
NullPointerException – 如果任务为空
**/
Future<?> submit(Runnable task);
FutureTask<T> ft = new FutureTask<T>(Callable<V> callable);
exec.submit(ft);
案例伪代码
/**
*拼装数据并插表(无返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
// 创建线程池
ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
//先对数据进行分割
List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
for (List<po> list : divideList){
//提交任务
exec.execute(() -> dataMapper.saveBatchData(list));
}
}
/**
*拼装数据并插表(有返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
//创建一个集合,来接收返回值
List<CompletableFuture<ResultDTO>> returnList = new ArrayList<>();
// 创建线程池
ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
//先对数据进行分割
List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
for (List<po> list : divideList){
//提交任务
CompletableFuture<ResultDTO> cf = CompletableFuture.supplyAsync(() -> {
try {
return dataMapper.saveBatchData(list);
} catch (Exception e) {
return null;
}
},exec).exceptionally( ex -> null);
returnList.add(cf)
}
/*之后可以对拿到的返回值进行自己的业务操作....................*/
//阻塞主线程-->等待子线程都执行完后执行,(这里我想等所有线程都执行完操作数据库后,删除redis锁)
CompletableFuture.allOf(returnList.toArray(new CompletableFuture[returnList.size()])).join();
redisTemplate.delete(key);
}
后续优化
经过多线程处理后,接口响应速度提升了10倍,并且可以做异步处理,接口先给出响应,数据在后台异步处理。
但是这也是处理几万的数据量的。虽然使用了多线程,但是依旧只是一个节点处理,依靠单个Jvm,如果数据量再大,且并发量提高,会造成OOM内存溢出。所以后续在不改变需求的情况下,使用分布式计算,依靠MQ任务分发,进行多节点处理的方式,性能和稳定性会大大提高。