线程池详解与异步任务编排使用案例-xian-cheng-chi-xiang-jie-yu-yi-bu-ren-wu-bian-pai-shi-yong-an-li

简介: 线程池详解与异步任务编排使用案例-xian-cheng-chi-xiang-jie-yu-yi-bu-ren-wu-bian-pai-shi-yong-an-li

title: 线程池详解与异步任务编排使用案例
date: 2022-10-11 21:42:47.586
updated: 2022-10-12 21:58:22.144
url: /archives/xian-cheng-chi-xiang-jie-yu-yi-bu-ren-wu-bian-pai-shi-yong-an-li
categories:
tags: 实战Demo | JUC | 并发编程

线程池详解与异步任务编排使用案例

1.初始化线程的4种方式

1)、继承Thread
2)、实现 Runnable接口
3)、实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
4)、线程池
区别:
  1、2不能得到返回值。3可以获取返回值
  1、2、3都不能控制资源(无法控制线程数【高并发时线程数耗尽资源】)
  4可以控制资源,性能稳定,不会一下子所有线程一起运行
结论:
  实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】

2.创建线程池的方式

创建固定线程数的线程池ExecutorService

固定线程数的线程池
Executors.newFixedThreadPool(10);

execute和submit区别

作用:都是提交异步任务的
execute:只能提交Runnable任务,没有返回值
submit:可以提交Runnable、Callable,返回值是FutureTask

创建原生线程池ThreadPoolExecutor

new ThreadPoolExecutor(5,
        200,
        10,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(100000),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());
7个参数:
    corePoolSize: 核心线程数,不会被回收,接收异步任务时才会创建
    maximumPoolSize:最大线程数量,控制资源
  keepAliveime:   maximumPoolSize-corePoolSize 无任务存活超过空闲时间则线程被释放
  TimeUnitunit: 时间单位
  workQueue:    阻塞队列,任务被执行之前保存在任务队列中,只要有线程空闲,就会从队列取出任务执行
  threadFactory:  线程的创建工厂【可以自定义】
  RejectedExecutionHandler handler:队列满后执行的拒绝策略
    
线程池任务执行流程
    当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
    当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
    当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
    当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理(默认策略抛出异常)
    当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,释放空闲线程
    当设置allowCoreThreadTimeOut(true)时,该参数默认false,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

拒绝策略

DiscardOldestPolicy:丢弃最老的任务
AbortPolicy:丢弃当前任务,抛出异常【默认策略】
CallerRunsPolicy:同步执行run方法
DiscardPolicy:丢弃当前任务,不抛出异常

阻塞队列

1.new LinkedBlockingDeque<>();// 默认大小是Integer.Max会导致内存不足,所以要做压力测试给出适当的队列大小

线程池

1.常见的4种默认线程池

注意:
    回收线程 = maximumPoolSize - corePoolSize
可缓冲线程池【CachedThreadPool】:corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE
定长线程池【FixedThreadPool】:corePoolSize=maximumPoolSize
周期线程池【ScheduledThreadPool】:指定核心线程数,maximumPoolSize=Integer.MAX_VALUE,支持定时及周期性任务执行(一段时间之后再执行)
单任务线程池【SingleThreadPool】:corePoolSize=maximumPoolSize=1,从队列中获取任务(一个核心线程)
  
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(10);
Executors.newScheduledThreadPool(10);
Executors.newSingleThreadExecutor();

2.为什么使用线程池?

1.降低资源的消耗【减少创建销毁操作】
  通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
  高并发状态下过多创建线程可能将资源耗尽
2.提高响应速度【控制线程个数】
  因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢)
3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】
  线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

异步编排CompletableFuture

1.runXXX都是没有返回结果的,supplyXXX可以获取返回结果
2.可以传入自定义线程池,否则使用默认线程池

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型

1.业务场景

4、5、6依赖1,得先知道sku是哪个spu下的

2.测试异步操作

supplyAsync

// 5.1.提交任务异步执行(supplyAsync)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "测试使用", executor);
System.out.println(future1.get());

thenRunAsync串行化

// 不能获取上一步结果 + 无返回值

thenAcceptAsync串行化

// 能获取上一步结果 + 无返回值

thenApplyAsync串行化

// 能获取上一步结果 + 有返回值
// 5.2.获取上一步结果并链式异步调用(thenApplyAsync)
CompletableFuture<String> future2 = future1.thenApplyAsync(s -> s + " 链式调用", executor);// 参数s是上一步的返回值
System.out.println(future2.get());

whenCompleteAsync

// 5.3.获取上一步执行结果并获取异常信息(whenCompleteAsync)【无法处理异常返回默认值】
CompletableFuture<String> future3 = future2.whenCompleteAsync((result, exception) -> System.out.println("结果是:" + result + "----异常是:" + exception));

exceptionally

// 5.4.获取上一步异常,如果出现异常可返回默认值,不出现异常保持原值(exceptionally)
CompletableFuture<Integer> future4 = future3.thenApplyAsync((s -> 1 / 0), executor);
CompletableFuture<Integer> future5 = future4.exceptionally(exception -> {
System.out.println("出现异常:" + exception);
return 10;
});// 出现异常,使用默认返回值
System.out.println("默认值:" + future5.get());

handle

// 5.5.方法执行完成后的处理
CompletableFuture<Integer> future6 = future3.thenApplyAsync((s -> 1 / 0), executor).handle((result, exception) -> {
    if (exception == null) {
        return result;
    }
    System.out.println("handle处理异常:" + exception);
    return 1;
});
System.out.println("handle处理返回结果:" + future6.get());

两任务组合-都要完成

runAfterBothAsync

// 5.6.1.二者都要完成,组合【不获取前两个任务返回值,且自己无返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<Void> future03 = future01.runAfterBothAsync(future02, () -> {
    System.out.println("任务3执行");
}, executor);

thenAcceptBothAsync

// 5.6.2.二者都要完成,组合【获取前两个任务返回值,自己无返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<Void> future03 = future01.thenAcceptBothAsync(future02,
        (result1, result2) -> {
            System.out.println("任务3执行");
            System.out.println("任务1返回值:" + result1);
            System.out.println("任务2返回值:" + result2);
        }, executor);

thenCombineAsync

// 5.6.3.二者都要完成,组合【获取前两个任务返回值,自己有返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<String> future03 = future01.thenCombineAsync(future02,
        (result1, result2) -> {
            System.out.println("任务3执行");
            System.out.println("任务1返回值:" + result1);
            System.out.println("任务2返回值:" + result2);
            return "任务3返回值";
        }, executor);
System.out.println(future03.get());

两任务组合-任一完成

runAfterEitherAsync

// 不获取前任务返回值,且当前任务无返回值

acceptEitherAsync

// 获取前任务返回值,但当前任务无返回值

applyToEitherAsync

// 获取前任务返回值,当前任务有返回值

多任务组合

allOf

// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
allOf.get();// 阻塞等待所有任务完成

anyOf

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);
anyOf.get();// 阻塞等待任一任务完成,返回值是执行成功的任务返回值

项目整合异步编排

1.注入线程池

gulimall:
  thread:
    core-size: 20
    max-size: 200
    keep-alive-time: 10
@ConfigurationProperties(prefix = "gulimall.thread")
@Data
public class ThreadPoolConfigProperties {
    private Integer coreSize;
    private Integer maxSize;
    private Integer keepAliveTime;
}
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
        return new ThreadPoolExecutor(
                pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
    }
}

2.实际业务使用异步编排

@Service("skuInfoService")
public class SkuInfoServiceImpl extends ServiceImpl<SkuInfoDao, SkuInfoEntity> implements SkuInfoService {
    @Autowired
    SkuImagesService skuImagesService;
    @Autowired
    SkuSaleAttrValueService skuSaleAttrValueService;
    @Autowired
    CouponAgentService couponAgentService;
    @Autowired
    SpuInfoDescService spuInfoDescService;
    @Autowired
    AttrGroupServiceImpl attrGroupService;
    @Autowired
    ThreadPoolExecutor executor;
    /**
     * 查询skuId商品信息,封装VO返回
     */
    @Override
    public SkuItemVO item(Long skuId) throws ExecutionException, InterruptedException {
        SkuItemVO result = new SkuItemVO();
        CompletableFuture<SkuInfoEntity> skuInfoFuture = CompletableFuture.supplyAsync(() -> {
            // 1.获取sku基本信息(pms_sku_info)【默认图片、标题、副标题、价格】
            SkuInfoEntity skuInfo = getById(skuId);
            result.setInfo(skuInfo);
            return skuInfo;
        }, executor);
        CompletableFuture<Void> imagesFuture = CompletableFuture.runAsync(() -> {
            // 2.获取sku图片信息(pms_sku_images)
            List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
            result.setImages(images);
        }, executor);
        CompletableFuture<Void> saleAttrFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> {
            // 3.获取当前sku所属spu下的所有销售属性组合(pms_sku_info、pms_sku_sale_attr_value)
            List<SkuItemSaleAttrVO> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
            result.setSaleAttr(saleAttr);
        }, executor);
        CompletableFuture<Void> descFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> {
            // 4.获取spu商品介绍(pms_spu_info_desc)【描述图片】
            SpuInfoDescEntity desc = spuInfoDescService.getById(skuInfo.getSpuId());
            result.setDesc(desc);
        }, executor);
        CompletableFuture<Void> groupAttrsFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> {
            // 5.获取spu规格参数信息(pms_product_attr_value、pms_attr_attrgroup_relation、pms_attr_group)
            List<SpuItemAttrGroupVO> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(), skuInfo.getCatalogId());
            result.setGroupAttrs(groupAttrs);
        }, executor);
        // 6.等待所有任务都完成
        CompletableFuture.allOf(imagesFuture, saleAttrFuture, descFuture, groupAttrsFuture).get();
        return result;
    }
}
目录
相关文章
|
2月前
|
编解码 数据安全/隐私保护 计算机视觉
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
如何使用OpenCV进行同步和异步操作来打开海康摄像头,并提供了相关的代码示例。
114 1
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
|
8天前
|
机器学习/深度学习 传感器 运维
使用机器学习技术进行时间序列缺失数据填充:基础方法与入门案例
本文探讨了时间序列分析中数据缺失的问题,并通过实际案例展示了如何利用机器学习技术进行缺失值补充。文章构建了一个模拟的能源生产数据集,采用线性回归和决策树回归两种方法进行缺失值补充,并从统计特征、自相关性、趋势和季节性等多个维度进行了详细评估。结果显示,决策树方法在处理复杂非线性模式和保持数据局部特征方面表现更佳,而线性回归方法则适用于简单的线性趋势数据。文章最后总结了两种方法的优劣,并给出了实际应用建议。
34 7
使用机器学习技术进行时间序列缺失数据填充:基础方法与入门案例
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
24 1
|
3月前
|
消息中间件 前端开发 NoSQL
面试官:如何实现线程池任务编排?
面试官:如何实现线程池任务编排?
43 1
面试官:如何实现线程池任务编排?
|
2月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
2月前
|
网络协议 安全 Java
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
85 0
|
4月前
|
缓存 Java
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
这篇文章详细介绍了Java中线程的四种初始化方式,包括继承Thread类、实现Runnable接口、实现Callable接口与FutureTask结合使用,以及使用线程池。同时,还深入探讨了线程池的七大参数及其作用,解释了线程池的运行流程,并列举了四种常见的线程池类型。最后,阐述了在开发中使用线程池的原因,如降低资源消耗、提高响应速度和增强线程的可管理性。
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
|
3月前
|
设计模式 缓存 Java
谷粒商城笔记+踩坑(14)——异步和线程池
初始化线程的4种方式、线程池详解、异步编排 CompletableFuture
谷粒商城笔记+踩坑(14)——异步和线程池
|
4月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
4月前
|
机器学习/深度学习 人工智能 数据处理
【人工智能】项目实践与案例分析:利用机器学习探测外太空中的系外行星
探测外太空中的系外行星是天文学和天体物理学的重要研究领域。随着望远镜观测技术的进步和大数据的积累,科学家们已经能够观测到大量恒星的光度变化,并尝试从中识别出由行星凌日(行星经过恒星前方时遮挡部分光线)引起的微小亮度变化。然而,由于数据量巨大且信号微弱,传统方法难以高效准确地识别所有行星信号。因此,本项目旨在利用机器学习技术,特别是深度学习,从海量的天文观测数据中自动识别和分类系外行星的信号。这要求设计一套高效的数据处理流程、构建适合的机器学习模型,并实现自动化的预测和验证系统。
86 1
【人工智能】项目实践与案例分析:利用机器学习探测外太空中的系外行星
下一篇
DataWorks