老生常谈的线程池希望你已经都会了

简介: 老生常谈的线程池希望你已经都会了

1、为什么要使用线程池


受限于硬件、内存和性能,我们不可能无限制的创建任意数量的线程,因为每一台机器允许的最大线程是一个有界值。线程池就是用这些有限个数的线程,去执行提交的任务。然而对于多用户、高并发的应用来说,提交的任务数量非常巨大,一定会比允许的最大线程数多很多。为了解决这个问题,必须要引入排队机制,或者是在内存中,或者是在硬盘等容量很大的存储介质中。Java提供的ThreadPoolExecutor只支持任务在内存中排队,通过BlockingQueue暂存还没有来得及执行的任务。


2、使用线程池的好处


  • 降低资源消耗,通过重复利用已创建的线程,降低创建和销毁线程的消耗。
  • 提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。


3、java实现


任务的管理是一件比较容易的事,复杂的是线程的管理,这会涉及线程数量、等待/唤醒、同步/锁、线程创建和死亡等问题。ThreadPoolExecutor与线程相关的几个成员变量是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它们共同负责线程的创建和销毁。常用构造器


ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue);
复制代码

几个重要参数:


  • corePoolSize: 核心线程数,只要创建了就会一直存活,即使没有是空闲
  • maximumPoolSize:线程池维护线程的最大数量
  • keepAliveTime: 线程池维护线程所允许的空闲时间
  • unit: 线程池维护线程所允许的空闲时间的单位
  • workQueue: 线程池所使用的缓冲队列
  • allowCoreThreadTimeout:允许核心线程超时
  • rejectedExecutionHandler:任务拒绝处理器,两种情况会拒绝处理任务:
  • 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
  • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
  • 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置,默认是AbortPolicy,会抛出异常
  • ThreadPoolExecutor类有几个内部实现类来处理这类情况:
  • AbortPolicy 丢弃任务,抛运行时异常
  • CallerRunsPolicy 执行任务
  • DiscardPolicy 忽视,什么都不会发生
  • DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
  • 实现RejectedExecutionHandler接口,可自定义处理器


执行机制


  • 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。


  • 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。


  • 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。


  • 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。


  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

image.png


4、线程池创建完成后设置


看下allowCoreThreadTimeOut和keepAliveTime属性的含义。在压力很大的情况下,线程池中的所有线程都在处理新提交的任务或者是在排队的任务,这个时候线程池处在忙碌状态。如果压力很小,那么可能很多线程池都处在空闲状态,这个时候为了节省系统资源,回收这些没有用的空闲线程,就必须提供一些超时机制,这也是线程池大小调节策略的一部分。通过corePoolSize和maximumPoolSize,控制如何新增线程;通过allowCoreThreadTimeOut和keepAliveTime,控制如何销毁线程。


设置核心、最大线程数

threadPool.setCorePoolSize(10);
threadPool.setMaximumPoolSize(10);
复制代码

设置核心线程数可空闲超时退出

threadPool.allowCoreThreadTimeOut(true);
复制代码

设置拒绝策略

threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
复制代码


5、监控线程池状态


/**
 * 线程池里活跃的大致线程数量
 */
long activeCount = threadPool.getActiveCount();
/**
 * 返回已执行的任务的大致总数。
 */
long taskCount = threadPool.getTaskCount();
/**
 * 返回线程池在运行过程中已完成的大致任务数
 */
long completedTaskCount = threadPool.getCompletedTaskCount();
/**
 * 返回线程池里的线程数量
 */
long poolSize = threadPool.getPoolSize();
/**
 * 返回曾经创建过的最大线程数
 */
long largestPoolSize = threadPool.getLargestPoolSize();
/**
 * 返回当前最大配置线程数
 */
long maximumPoolSize = threadPool.getMaximumPoolSize();
复制代码

这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(我测试时前面设置的都是20,现在设置的都是10),一个线程池里的最大线程数是10,那么为什么有的时候largestPooSize可以是20呢?我去翻这块的源码注释:

/**
 * Returns the largest number of threads that have ever
 * simultaneously been in the pool.
 *
 * @return the number of threads
 */
public int getLargestPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        return largestPoolSize;
    } finally {
        mainLock.unlock();
    }
}
复制代码

在addWorker方法中发现两点:


  • largestPoolSize是worker集合的历史最大值,只增不减。largestPoolSize的大小是线程池曾创建的线程个数,跟线程池的容量无关;


  • largestPoolSize<=maximumPoolSize。


6、拒绝策略


前面已经提到四种拒绝策略,下面进行详细解释和验证

1、默认策略AbortPolicy 丢弃任务,抛运行时异常

private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 5, SECONDS, new ArrayBlockingQueue<>(2));
private static void task() {
    try {
        sleep(1000);
        System.out.println(Thread.currentThread().getName() + ":queue size:" + threadPool.getQueue().size() + ", active:" + threadPool.getActiveCount());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
@Test
public void test_aborot_policy() {
    for (int i = 0; i < 10; i++) {
        threadPool.submit(TestThread::task);
    }
}
运行结果:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6833ce2c rejected from java.util.concurrent.ThreadPoolExecutor@725bef66[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
复制代码

2、CallerRunsPolicy 用主线程去执行任务

@Test
public void test_CallerRunsPolicy() throws InterruptedException {
    threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 0; i < 10; i++) {
        threadPool.submit(TestThread::task);
    }
    Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果:
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
main:queue size:2, active:2
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
main:queue size:0, active:2
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
pool-2-thread-1:queue size:0, active:2
pool-2-thread-2:queue size:0, active:2
复制代码

可以看到主线程去执行了部分任务,同时主线程此时无法进行调度线程,直到主线程执行完毕再开始线程调度执行,activeCount不准是因为线程状态瞬息万变,activeCount只是对当前活跃线程数的大体估计。


3、 DiscardPolicy 忽视,直接丢弃,什么都不会发生

@Test
public void test_DiscardPolicy() throws InterruptedException {
    threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    for (int i = 0; i < 20; i++) {
        threadPool.submit(TestThread::task);
    }
    Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果;
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
pool-2-thread-1:queue size:0, active:2
pool-2-thread-2:queue size:0, active:2
复制代码

只有线程和队列的执行了,其他都被丢弃了,也不会抛出异常


4、 DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务

private static void task2(int cnout) {
    try {
        sleep(1000);
        System.out.println(cnout);
        System.out.println(Thread.currentThread().getName() + ":queue size:" + threadPool.getQueue().size() + ", active:" + threadPool.getActiveCount());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
@Test
public void test_DiscardOldestPolicy() throws InterruptedException {
    threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    for (int i = 0; i < 20; i++) {
        int finalI = i;
        threadPool.submit(()->task2(finalI));
    }
    Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果:
0
3
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
18
19
pool-2-thread-1:queue size:0, active:2
pool-2-thread-2:queue size:0, active:2
复制代码

可以看出除了最开始执行的两个线程和最后进入队列的两个线程,其他任务都被丢弃了


7、线程工厂ThreadFactory


通过线程池创建的构造器可以看到还有一个参数:Executors.defaultThreadFactory()

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
复制代码

这些工厂类都实现了一个接口,接口只有一个newThread方法就是用来生产线程的,子类需要实现这个方法来根据自己规则生产相应的线程。

DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
复制代码

DefaultThreadFactory就做了两件事,一是创建线程时把线程设为非守护线程,二是设置线程的名字为pool-2-thread-1格式。但是这种格式的线程名区分度不够,最好能根据线程池使用的功能来定义线程名,这时可以引入其他线程工厂,比如用guava的ThreadFactoryBuilder和Spring的CustomizableThreadFactory:

@Test
public void test_thread_factory() throws InterruptedException {
    threadPool.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("Clear Task-%d").build());
    for (int i = 0; i < 4; i++) {
        threadPool.submit(TestThread::task);
    }
    Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果:
Clear Task-0:queue size:2, active:2
Clear Task-1:queue size:2, active:2
Clear Task-0:queue size:0, active:2
Clear Task-1:queue size:0, active:2
复制代码


8、Java自带创建线程池工具类


java通过Executors提供四种线程池,分别为:


  • newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。


  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,表示同一时刻只能有这么大的并发数


  • newScheduledThreadPool 创建一个定时线程池,支持定时及周期性任务执行。


  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。


  • newWorkStealingPool 从1.8后新增,创建一个具有抢占式操作的线程池,使用的是ForkJoinPool根据可用cpu数创建线程个数,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中


但是在阿里巴巴Java开发手册中不允许使用Executors创建线程池。


原因是这几种方式创建线程池时都有可能出现OOM现象,要么使用阻塞队列但是都是使用的无界队列,队列默认值为Integer.MAX_VALUE,要么线程最大可创建数量为Integer.MAX_VALUE,这都会导致内存溢出现象


public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
复制代码

所以创建线程池还是最好使用传统的创建方式ThreadPoolExecutor。


结语:如何合理配置线程池的大小


程序运行中绝对不是线程数越多处理越快!!!


NCpu=Runtime.getRuntime().availableProcessors()


  • 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1, 常见场景:复杂算法,这种情况我理解CPU处理内存业务,不需要频繁切换线程,所以线程数不应该设置太多,减少上下文切换消耗。


  • 如果是IO密集型任务,参考值可以设置为2*NCPU ,常见场景:数据库交互、文件上传下载、网络传输,当线程处于IO操作时,线程是阻塞的,线程由运行状态切换到等待状态。此时CPU会做上下文切换,以便处理其他程序;当IO操作完成后,cpu会收到一个来自硬盘的中断信号,CPU正在执行的线程因此会被打断,回到ready队列。而先前因I/O而waiting的线程随着I/O的完成也再次回到就绪队列,此时CPU可能会选择他执行,这种情况由于会阻塞IO所以可以多几条线程并行处理。


当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。


目录
相关文章
|
5月前
|
存储 缓存 Java
老程序员分享:Java并发编程:线程池的使用
老程序员分享:Java并发编程:线程池的使用
|
6月前
|
监控 Java 调度
Java并发编程:线程池的原理与实践
【5月更文挑战第30天】 在现代软件开发中,尤其是Java应用中,并发编程是一个不可忽视的领域。线程池作为提升应用性能和资源利用率的关键技术之一,其正确使用和优化对系统稳定性和效率至关重要。本文将深入探讨线程池的核心原理、常见类型以及在实际开发中的使用案例,旨在帮助开发者更好地理解和运用线程池技术,构建高性能的Java应用程序。
|
5月前
|
监控 Java UED
Java并发编程:深入理解线程池的设计与应用
本文旨在通过数据导向和科学严谨的方式,深入探讨Java并发编程中的关键组件——线程池。文章首先概述了线程池的基本概念与重要性,随后详细解读了线程池的核心参数及其对性能的影响,并通过实验数据支持分析结果。此外,文中还将介绍如何根据不同的应用场景选择或设计合适的线程池,以及如何避免常见的并发问题。最后,通过案例研究,展示线程池在实际应用中的优化效果,为开发人员提供实践指导。
50 0
|
5月前
|
监控 Java
面试官:说一说如何优雅的关闭线程池,我:shutdownNow,面试官:粗鲁!
【6月更文挑战第4天】面试官:说一说如何优雅的关闭线程池,我:shutdownNow,面试官:粗鲁!
31 0
|
6月前
|
Java
Java并发编程:线程池的深入理解与实践
【2月更文挑战第29天】在Java并发编程中,线程池是一种重要的技术手段,它可以有效地管理和控制线程,提高系统性能。本文将深入探讨线程池的原理,解析其关键参数,并通过实例演示如何在实际开发中合理使用线程池。
|
6月前
|
存储 分布式计算 Java
不是吧?线程池这样搞?
学习线程池能够帮助我们更好地处理多线程编程,并提高程序的性能和稳定性。线程池指定线程数这块,首先要考量自己的业务是什么样的?是cpu密集型的还是io密集型的,假设运行应用的机器CPU核心数是N。 cpu密集型的可以先给到N+1,io密集型的可以给到2N 。
54 1
|
存储 缓存 Java
线程池之刨根问底
线程池之刨根问底
123 0
线程池之刨根问底
【多线程】线程池如何复用,怎么才能让面试官听懂我说的?
今天来说一下面试中常问到问题,我们知道线程池是帮助我们对线程资源的管理,只有我们合理的使用使用线程池,他才能做到事倍功半,但是你知道线程池是如何复用的吗?