title: 线程池详解与异步任务编排使用案例
date: 2022-10-11 21:42:47.586
updated: 2022-10-12 21:58:22.144
url: /archives/xian-cheng-chi-xiang-jie-yu-yi-bu-ren-wu-bian-pai-shi-yong-an-li
categories:
tags: 实战Demo | JUC | 并发编程
线程池详解与异步任务编排使用案例
1.初始化线程的4种方式
1)、继承Thread 2)、实现 Runnable接口 3)、实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常) 4)、线程池 区别: 1、2不能得到返回值。3可以获取返回值 1、2、3都不能控制资源(无法控制线程数【高并发时线程数耗尽资源】) 4可以控制资源,性能稳定,不会一下子所有线程一起运行 结论: 实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】
2.创建线程池的方式
创建固定线程数的线程池ExecutorService
固定线程数的线程池 Executors.newFixedThreadPool(10);
execute和submit区别
作用:都是提交异步任务的 execute:只能提交Runnable任务,没有返回值 submit:可以提交Runnable、Callable,返回值是FutureTask
创建原生线程池ThreadPoolExecutor
new ThreadPoolExecutor(5, 200, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 7个参数: corePoolSize: 核心线程数,不会被回收,接收异步任务时才会创建 maximumPoolSize:最大线程数量,控制资源 keepAliveime: maximumPoolSize-corePoolSize 无任务存活超过空闲时间则线程被释放 TimeUnitunit: 时间单位 workQueue: 阻塞队列,任务被执行之前保存在任务队列中,只要有线程空闲,就会从队列取出任务执行 threadFactory: 线程的创建工厂【可以自定义】 RejectedExecutionHandler handler:队列满后执行的拒绝策略 线程池任务执行流程 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理(默认策略抛出异常) 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,释放空闲线程 当设置allowCoreThreadTimeOut(true)时,该参数默认false,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
拒绝策略
DiscardOldestPolicy:丢弃最老的任务 AbortPolicy:丢弃当前任务,抛出异常【默认策略】 CallerRunsPolicy:同步执行run方法 DiscardPolicy:丢弃当前任务,不抛出异常
阻塞队列
1.new LinkedBlockingDeque<>();// 默认大小是Integer.Max会导致内存不足,所以要做压力测试给出适当的队列大小
线程池
1.常见的4种默认线程池
注意: 回收线程 = maximumPoolSize - corePoolSize 可缓冲线程池【CachedThreadPool】:corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE 定长线程池【FixedThreadPool】:corePoolSize=maximumPoolSize 周期线程池【ScheduledThreadPool】:指定核心线程数,maximumPoolSize=Integer.MAX_VALUE,支持定时及周期性任务执行(一段时间之后再执行) 单任务线程池【SingleThreadPool】:corePoolSize=maximumPoolSize=1,从队列中获取任务(一个核心线程) Executors.newCachedThreadPool(); Executors.newFixedThreadPool(10); Executors.newScheduledThreadPool(10); Executors.newSingleThreadExecutor();
2.为什么使用线程池?
1.降低资源的消耗【减少创建销毁操作】 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗 高并发状态下过多创建线程可能将资源耗尽 2.提高响应速度【控制线程个数】 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢) 3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
异步编排CompletableFuture
1.runXXX都是没有返回结果的,supplyXXX可以获取返回结果 2.可以传入自定义线程池,否则使用默认线程池
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
1.业务场景
4、5、6依赖1,得先知道sku是哪个spu下的
2.测试异步操作
supplyAsync
// 5.1.提交任务异步执行(supplyAsync) CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "测试使用", executor); System.out.println(future1.get());
thenRunAsync串行化
// 不能获取上一步结果 + 无返回值
thenAcceptAsync串行化
// 能获取上一步结果 + 无返回值
thenApplyAsync串行化
// 能获取上一步结果 + 有返回值 // 5.2.获取上一步结果并链式异步调用(thenApplyAsync) CompletableFuture<String> future2 = future1.thenApplyAsync(s -> s + " 链式调用", executor);// 参数s是上一步的返回值 System.out.println(future2.get());
whenCompleteAsync
// 5.3.获取上一步执行结果并获取异常信息(whenCompleteAsync)【无法处理异常返回默认值】 CompletableFuture<String> future3 = future2.whenCompleteAsync((result, exception) -> System.out.println("结果是:" + result + "----异常是:" + exception));
exceptionally
// 5.4.获取上一步异常,如果出现异常可返回默认值,不出现异常保持原值(exceptionally) CompletableFuture<Integer> future4 = future3.thenApplyAsync((s -> 1 / 0), executor); CompletableFuture<Integer> future5 = future4.exceptionally(exception -> { System.out.println("出现异常:" + exception); return 10; });// 出现异常,使用默认返回值 System.out.println("默认值:" + future5.get());
handle
// 5.5.方法执行完成后的处理 CompletableFuture<Integer> future6 = future3.thenApplyAsync((s -> 1 / 0), executor).handle((result, exception) -> { if (exception == null) { return result; } System.out.println("handle处理异常:" + exception); return 1; }); System.out.println("handle处理返回结果:" + future6.get());
两任务组合-都要完成
runAfterBothAsync
// 5.6.1.二者都要完成,组合【不获取前两个任务返回值,且自己无返回值】 CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1执行"); return 10 / 2; }, executor); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2执行"); return "hello"; }, executor); CompletableFuture<Void> future03 = future01.runAfterBothAsync(future02, () -> { System.out.println("任务3执行"); }, executor);
thenAcceptBothAsync
// 5.6.2.二者都要完成,组合【获取前两个任务返回值,自己无返回值】 CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1执行"); return 10 / 2; }, executor); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2执行"); return "hello"; }, executor); CompletableFuture<Void> future03 = future01.thenAcceptBothAsync(future02, (result1, result2) -> { System.out.println("任务3执行"); System.out.println("任务1返回值:" + result1); System.out.println("任务2返回值:" + result2); }, executor);
thenCombineAsync
// 5.6.3.二者都要完成,组合【获取前两个任务返回值,自己有返回值】 CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1执行"); return 10 / 2; }, executor); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2执行"); return "hello"; }, executor); CompletableFuture<String> future03 = future01.thenCombineAsync(future02, (result1, result2) -> { System.out.println("任务3执行"); System.out.println("任务1返回值:" + result1); System.out.println("任务2返回值:" + result2); return "任务3返回值"; }, executor); System.out.println(future03.get());
两任务组合-任一完成
runAfterEitherAsync
// 不获取前任务返回值,且当前任务无返回值
acceptEitherAsync
// 获取前任务返回值,但当前任务无返回值
applyToEitherAsync
// 获取前任务返回值,当前任务有返回值
多任务组合
allOf
// 等待所有任务完成 CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03); allOf.get();// 阻塞等待所有任务完成
anyOf
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03); anyOf.get();// 阻塞等待任一任务完成,返回值是执行成功的任务返回值
项目整合异步编排
1.注入线程池
gulimall: thread: core-size: 20 max-size: 200 keep-alive-time: 10
@ConfigurationProperties(prefix = "gulimall.thread") @Data public class ThreadPoolConfigProperties { private Integer coreSize; private Integer maxSize; private Integer keepAliveTime; }
@EnableConfigurationProperties(ThreadPoolConfigProperties.class) @Configuration public class MyThreadConfig { @Bean public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) { return new ThreadPoolExecutor( pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); } }
2.实际业务使用异步编排
@Service("skuInfoService") public class SkuInfoServiceImpl extends ServiceImpl<SkuInfoDao, SkuInfoEntity> implements SkuInfoService { @Autowired SkuImagesService skuImagesService; @Autowired SkuSaleAttrValueService skuSaleAttrValueService; @Autowired CouponAgentService couponAgentService; @Autowired SpuInfoDescService spuInfoDescService; @Autowired AttrGroupServiceImpl attrGroupService; @Autowired ThreadPoolExecutor executor; /** * 查询skuId商品信息,封装VO返回 */ @Override public SkuItemVO item(Long skuId) throws ExecutionException, InterruptedException { SkuItemVO result = new SkuItemVO(); CompletableFuture<SkuInfoEntity> skuInfoFuture = CompletableFuture.supplyAsync(() -> { // 1.获取sku基本信息(pms_sku_info)【默认图片、标题、副标题、价格】 SkuInfoEntity skuInfo = getById(skuId); result.setInfo(skuInfo); return skuInfo; }, executor); CompletableFuture<Void> imagesFuture = CompletableFuture.runAsync(() -> { // 2.获取sku图片信息(pms_sku_images) List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId); result.setImages(images); }, executor); CompletableFuture<Void> saleAttrFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> { // 3.获取当前sku所属spu下的所有销售属性组合(pms_sku_info、pms_sku_sale_attr_value) List<SkuItemSaleAttrVO> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId()); result.setSaleAttr(saleAttr); }, executor); CompletableFuture<Void> descFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> { // 4.获取spu商品介绍(pms_spu_info_desc)【描述图片】 SpuInfoDescEntity desc = spuInfoDescService.getById(skuInfo.getSpuId()); result.setDesc(desc); }, executor); CompletableFuture<Void> groupAttrsFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> { // 5.获取spu规格参数信息(pms_product_attr_value、pms_attr_attrgroup_relation、pms_attr_group) List<SpuItemAttrGroupVO> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(), skuInfo.getCatalogId()); result.setGroupAttrs(groupAttrs); }, executor); // 6.等待所有任务都完成 CompletableFuture.allOf(imagesFuture, saleAttrFuture, descFuture, groupAttrsFuture).get(); return result; } }