【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池

简介: 🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。

 

image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。

目录

9.1.5. ThreadPoolExecutor

9.1.5.1. 状态和数量

9.1.5.2. 构造方法

9.1.5.3. Executors-newFixedThreadPool

9.1.5.4. Executors-newCachedThreadPool

9.1.5.5. Executors-newSingleThreadExecutor

9.1.5.6. Executors-newSingleThreadExecutor

9.1.5.7. 创建线程池方法对比

9.1.5.8. 提交任务方法

9.1.5.9. 关闭线程池


9.1.5. ThreadPoolExecutor

9.1.5.1. 状态和数量

image.gif 编辑

ThreadPoolExecutor状态和数量:

状态名

高3位

接受新任务

处理阻塞队列任务

说明

RUNNING

111

Y

Y

接受新任务,并会处理阻塞队列中的任务

SHUTDOWN

000

N

Y

不会接受新任务,但会处理阻塞队列中剩余的任务

STOP

001

N

N

中断正在执行的任务,抛弃阻塞队列中的任务

TERMINATED

010

-

-

任务全部执行完毕,活动线程数为0,即将进入终结

TERMINATED

011

-

-

线程池终结

采用int高3位表示线程池状态,低29位表示线程数量,存储在一个原子变量ctl中,目的是将线程状态与线程个数合二为一,这样就可以用一次CAS对其赋值

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))

从数字上,TERMINATED>TERMINATED>STOP>SHUTDOWN>RUNNING,高三位的1表示负数

9.1.5.2. 构造方法

public ThreadPoolExecutor(
    int corePoolSize,//核心线程数(最多保留的线程数)
    int maximumPoolSize,//最大线程数
    long keepAliveTime,//生存时间,针对救急线程
    TimeUnit unit,//时间单位,针对救急线程
    BlockingQueue<Runnable> workQueue,//阻塞队列
    ThreadFactory threadFactory,//线程工厂,创建线程时起名字
    RejectedExecutionHandler handler)//拒绝策略

image.gif

ThreadPoolExecutor的工作流程:

  1. ThreadPoolExecutor包含两类线程:核心线程和救急线程,采用懒加载的创建方式,存在救急线程的前提是选择有界队列
  2. corePoolSize指核心线程数,maximumPoolSize指核心线程数+救急线程数
  3. 当核心线程都在执行任务且阻塞队列已满但是还有任务继续入队时,ThreadPoolExecutor会先检查线程池中是否可以有救急线程
  4. 有,救急线程执行多出来的任务,执行完任务等待keepAliveTime后,要是没有任务继续入队,救急线程就会被销毁,下次高峰期才会再次创建救急线程
  5. 没有,说明任务数超过了maximumPoolSize,采用拒绝策略
  6. JDK提供了4中拒绝策略

    image.gif 编辑
  1. AbortPolicy:抛出RejectedExecutionException异常,默认策略
  2. CallerRunsPolicy:让调用者运行任务
  3. DiscardPolicy:放弃本次任务
  4. DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
  1. 第三方框架中也有一些拒绝策略的扩展
  1. DubboAbortPolicy基础上增加日志功能,并调用jstack抓取当前栈中的信息,方便定位问题
  2. Netty创建新的线程来执行任务,这样实现并不好,因为就没有了限制
  3. ActiveMQ超时等待60s
  4. PinPoint使用了一个拒绝策略链,尝试策略链中每一个拒绝策略

9.1.5.3. Executors-newFixedThreadPool

//创建一个固定大小的线程池:适用于任务量已知,相对耗时的任务
public static ExecutorService newFixedThreadPool(int nThreads) {//传递的线程数
return new ThreadPoolExecutor(
    //核心线程数:nThreads,最大线程数:nThreads
    nThreads, nThreads,//没有救急线程
    0L, TimeUnit.MILLISECONDS,//存活时间:0毫秒
    //阻塞队列:LinkedBlockingQueue无界队列
    new LinkedBlockingQueue<Runnable>());
}

image.gif

9.1.5.4. Executors-newCachedThreadPool

//创建一个缓冲线程池:适用于任务量不断增长,但每个任务执行时间较短的情况
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
        //核心线程数:0最大线程数:2,147,483,647
        0, Integer.MAX_VALUE,//没有核心线程,全都是救急线程,且可以无限创建,存活时间为60s
        60L, TimeUnit.SECONDS,//存活时间:60秒
        //阻塞队列:SynchronousQueue同步队列,没有容量,一手交钱一手交货
        new SynchronousQueue<Runnable>(),
    );
}

image.gif

9.1.5.5. Executors-newSingleThreadExecutor

//创建一个单线程线程池:适用于任务是串行执行,多出来的任务排队
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(
        //核心线程数1,最大线程数1
        1, 1,
        0L, TimeUnit.MILLISECONDS,//存活时间0毫秒
        //阻塞队列:LinkedBlockingQueue无界队列
        new LinkedBlockingQueue<Runnable>()));
}

image.gif

9.1.5.6. Executors-newSingleThreadExecutor

//创建一个带有任务调用的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//延时执行任务
public <V> ScheduledFuture<V> schedule(
    //具体执行的任务对象
    Runnable command,
    //延时时间
    long delay,
    TimeUnit unit);
//定时执行任务
public ScheduledFuture<?> scheduleAtFixedRate(
    //具体的执行任务对象
    Runnable command,
    //初始延时时间
    long initialDelay,
    //任务之间的执行延迟时间:从上一次任务开始执行时,延迟时间就开始
    long period,
    TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(
    //具体的执行任务对象
    Runnable command,
    //初始延时时间
    long initialDelay,
    //任务之间的执行延迟时间:从上一次任务执行结束时,延迟时间才开始
    long delay,
    TimeUnit unit)

image.gif

9.1.5.7. 创建线程池方法对比

newSingleThreadExecutor()与自己创建一个单线程串行执行任务的区别:

  • 自己创建一个单线程串行执行任务如果遇到异常情况,没有任务补救措施,整个程序停止
  • newSingleThreadExecutor()遇到异常情况还会创建一个新的线程,保持始终有一个线程工作

newSingleThreadExecutor()newFixedThreadPool(1)的区别

  • newSingleThreadExecutor()线程数始终为1,不能修改,FinalizableDelegatedExecutorService应用的是装饰器模式,对外只暴露了ExecutorService接口,不能调用ThreadPoolExecutor中特有的方法
  • newFixedThreadPool(1)初始线程数为1,之后还可以通过对外暴露的ThreadPoolExecutor对象来调用其setCorePoolSize()来修改线程数

9.1.5.8. 提交任务方法

//执行任务
void execute(Runnable command);
//提交任务task,用返回值Future获取任务执行的结果
<T> Future<T> submit(Callable<T> task);
//提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
//超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
//提交tasks中所有任务,哪个任务先执行完毕,返回此任务的返回结果,其他任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
//超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException;

image.gif

@Slf4j
public class SubmitTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        invokeAny(pool);
    }
    private static void invokeAny(ExecutorService pool) throws InterruptedException, ExecutionException {
        Object result = pool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("running...111");
                    Thread.sleep(new Random().nextInt(10000));
                    log.debug("end...111");
                    return "1";
                },
                () -> {
                    log.debug("running...222");
                    Thread.sleep(new Random().nextInt(10000));
                    log.debug("end...222");
                    return "2";
                },
                () -> {
                    log.debug("running...333");
                    Thread.sleep(new Random().nextInt(10000));
                    log.debug("end...333");
                    return "3";
                }
        ));
        log.debug("执行结果:{}",result);
    }
    private static void invokeAll(ExecutorService pool) throws InterruptedException {
        List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("running...111");
                    Thread.sleep(new Random().nextInt(10000));
                    return "1";
                },
                () -> {
                    log.debug("running...222");
                    Thread.sleep(new Random().nextInt(10000));
                    return "2";
                },
                () -> {
                    log.debug("running...333");
                    Thread.sleep(new Random().nextInt(10000));
                    return "3";
                }
        ));
        futures.forEach(future -> {
            try {
                log.debug("执行结果:{}", future.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }
    private static void submit(ExecutorService pool) throws InterruptedException, ExecutionException {
        Future<String> future = pool.submit(() -> {
            log.debug("正在执行");
            Thread.sleep(1000);
            return "任务结束";
        });
        log.debug("执行结果:{}", future.get());
    }
}

image.gif

9.1.5.9. 关闭线程池

/*
1.将线程池状态变为SHUTDOWN
2.不会接受新任务,把剩余任务完成
3.不会阻塞调用线程
*/
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改线程池状态
        advanceRunState(SHUTDOWN);
        //打断空闲的线程
        interruptIdleWorkers();
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池,检查线程池状态是否为TERMINATED
    tryTerminate();
}

image.gif

/*
1.将线程池状态变为STOP
2.不会接受新任务,剩余任务抛弃并返回
3.用interrupt打断正在执行任务的线程
*/
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改线程池状态
        advanceRunState(STOP);
        //打断所有线程
        interruptWorkers();
        //将剩余任务返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池,检查线程池状态是否为TERMINATED
    tryTerminate();
    return tasks;
}

image.gif

//检查线程池状态是否处于RUNNING,是返回false,否返true
public boolean isShutdown();
//检查线程池状态是否处于TERMINATED,是返回true,否返false
public boolean isTerminated();
//调用shutdown()方法后,线程池不会等待所有线程任务执行结束
//如果想在线程池TERMINATED后做些事情,可以用此方法等待
public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException

image.gif

目录
相关文章
|
11天前
|
Java 数据库连接 API
2025 更新必看:Java 编程基础入门级超级完整版指南
本教程为2025更新版Java编程基础入门指南,涵盖开发环境搭建(SDKMAN!管理JDK、VS Code配置)、Java 17+新特性(文本块、Switch表达式增强、Record类)、面向对象编程(接口默认方法、抽象类与模板方法)、集合框架深度应用(Stream API高级操作、并发集合)、模式匹配与密封类等。还包括学生成绩管理系统实战项目,涉及Maven构建、Lombok简化代码、JDBC数据库操作及JavaFX界面开发。同时提供JUnit测试、日志框架使用技巧及进阶学习资源推荐,助你掌握Java核心技术并迈向高级开发。
77 5
|
17天前
|
JavaScript 前端开发 Java
Java 编程进阶实操中工具集整合组件封装方法与使用指南详解
本文详细介绍Hutool工具集和图书管理系统相关组件的封装方法及使用示例。通过通用工具类封装(如日期格式化、字符串处理、加密等)、数据库操作封装(结合Hutool DbUtil与MyBatis)、前端Vue组件封装(图书列表与借阅表单)以及后端服务层封装(业务逻辑实现与REST API设计),帮助开发者提升代码复用性与可维护性。同时,提供最佳实践建议,如单一职责原则、高内聚低耦合、参数配置化等,助力高效开发。适用于Java编程进阶学习与实际项目应用。
87 10
|
11天前
|
Oracle Java 关系型数据库
java 编程基础入门级超级完整版教程详解
这份文档是针对Java编程入门学习者的超级完整版教程,涵盖了从环境搭建到实际项目应用的全方位内容。首先介绍了Java的基本概念与开发环境配置方法,随后深入讲解了基础语法、控制流程、面向对象编程的核心思想,并配以具体代码示例。接着探讨了常用类库与API的应用,如字符串操作、集合框架及文件处理等。最后通过一个学生成绩管理系统的实例,帮助读者将理论知识应用于实践。此外,还提供了进阶学习建议,引导学员逐步掌握更复杂的Java技术。适合初学者系统性学习Java编程。资源地址:[点击访问](https://pan.quark.cn/s/14fcf913bae6)。
60 2
|
12天前
|
人工智能 Java API
Java并发编程之Future与FutureTask
本文深入解析了Future接口及其实现类FutureTask的原理与使用。Future接口定义了获取任务结果、取消任务及查询任务状态的规范,而FutureTask作为其核心实现类,结合了Runnable与Future的功能。文章通过分析FutureTask的成员变量、状态流转、关键方法(如run、set、get、cancel等)的源码,展示了异步任务的执行与结果处理机制。最后,通过示例代码演示了FutureTask的简单用法,帮助读者更直观地理解其工作原理。适合希望深入了解Java异步编程机制的开发者阅读。
|
4月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
87 26
|
4月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
95 17
|
6月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
506 2
|
7月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
7月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
6月前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
121 10
下一篇
oss创建bucket