前言
- 日常写代码过程中,我经常会有一些处理很多数据的业务,如一些定时任务,需要用到线程池
1.定义一个线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),//这里我获取的物理机的核心线程数
2,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
1.1线程池七大参数
- corePoolSize 核心线程数目 - 池中会保留的最多线程数
- maximumPoolSize 最大线程数目 - 核心线程+救急线程的最大数目
- keepAliveTime 生存时间 - 救急线程的生存时间,生存时间内没有新任务,此线程资源会释放
- unit 时间单位 - 救急线程的生存时间单位,如秒、毫秒等
- workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
- threadFactory 线程工厂 - 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等
- handler 拒绝策略 - 当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略
1 抛异常 java.util.concurrent.ThreadPoolExecutor.AbortPolicy
2 由调用者执行任务 java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
3 丢弃任务 java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
4 丢弃最早排队任务 java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
1.2使用线程池(1.配合CompletableFuture.supplyAsync()使用)
List<Integer> list = new CopyOnWriteArrayList();
Data data = new Data();
for (int i = 0; i < 3000; i++) {
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getName());
return data.getData();
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
},poolExecutor);
try {
list.add(supplyAsync.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
poolExecutor.shutdown();
1.2.1 CopyOnWriteArrayList
- CopyOnWriteArrayList类最大的特点就是,在对其实例进行修改操作(add/remove等)会新建一个数据并修改,修改完毕之后,再将原来的引用指向新的数组。这样,修改过程没有修改原来的数组。也就没有了ConcurrentModificationException错误。
1.3使用线程池(2.配合CountDownLatch()使用)
List<String> list = new ArrayList<>();
list.add("test1");
list.add("test2");
list.add("test3");
list.add("test4");
list.add("test5");
CountDownLatch countDownLatch = new CountDownLatch(list.size());
ExecutorService threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());//四种拒绝策略
try {
for (String s : list) {
threadPool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " --------------ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown(); //-1
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.await();
//线程池用完 程序结束, 关闭线程池
threadPool.shutdown();
}
- countDownLatch.await(); 等待计数器归零
总结
- CPU密集型任务(N+1):这种任务消耗的主要是CPU资源,可以将线程数设置为N(CPU核心数)+1,比CPU核心数多出来一个线程是为了防止线程偶发的缺页中断,或者其他原因导致的任务暂停而带来的影响。一旦任务停止,CPU就会处于空闲状态,而这种情况下多出来一个线程就可以充分利用CPU的空闲时间
- I/O密集型(2N):这种任务应用起来,系统大部分时间用来处理I/O交互,而线程在处理I/O的是时间段内不会占用CPU来处理,这时就可以将CPU交出给其他线程使用。因此在I/O密集型任务的应用中,可以配置多一些线程,具体计算方是2N。