线程池的核心参数和运行机制

简介: 线程池的核心参数和运行机制

常见的三种工厂类线程池

1、newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

由于队列缓冲区为空,每来一个任务时,都会在必要时新建线程执行任务直到到达Integer.MAX_VALUE,这就有可能导致大量的线程被创建,进而造成oom

2、newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        //超过nThreads个线程后会无限制的往阻塞队列放入线程
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可能因为阻塞队列被撑爆造成oom

3、newSingleThreadExecutor

创建一个单线程化的线程池

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
         //超过1个线程后会无限制的往阻塞队列放入线程
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

4、newScheduledThreadPool

定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

5、newWorkStealingPool

现代线程池使用偷窃工作-每个线程都有自己的队列.当线程池线程产生任务时,它将任务排队到他自己的队列中.当线程池线程想要使任务出队时-它首先尝试将任务从他自己的队列中出队,如果没有,则从其他线程队列中"窃取"工作.这确实减少了线程池的争用并提高了性能,ForkJoinPool也利用偷窃工作(类似于mapreduce)。

newWorkStealingPool适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

线程池设计原理

 

线程池常用阻塞队列

1.SynchronousQueue

private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), r -> new Thread(r, "ThreadTest"));

SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

拥有公平(FIFO)和非公平(LIFO)策略,非公平侧罗会导致一些数据永远无法被消费的情况

使用SynchronousQueue阻塞队列一般用于构造newCachedThreadPool

2.LinkedBlockingQueue

private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, "ThreadTest"));

LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

3.ArrayBlockingQueue

private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(32), r -> new Thread(r, "ThreadTest"));

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。

线程池拒绝策略

1、CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)

2、AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();}

这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)

3、DiscardPolicy:不能执行的任务将被删除

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} 这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

4、DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}

该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队

如何合理设计线程池里的线程数

1.IO密集型 由于线程并不是一直在运行,所以可以尽可能的多配置线程

线程数 = CPU可用核心数/(1-阻塞系数)

阻塞系数= IO时间/(IO时间+CPU时间)

计算密集型任务的阻塞系数为0,


IO密集型任务的阻塞系数则接近于1。

一个完全阻塞的任务是注定要挂掉的,所以我们无须担心阻塞系数会达到1。

阻塞系数可以采用一些性能分析工具或java.lang.managenment

API来确定线程话在系统I/O操作上的时间与CPU密集任务所消耗的时间比值。

2.CPU 密集型任务(大量复杂的运算)应当分配较少的线程,比如 CPU 个数相当的大小。

线程池内运行的线程抛异常,线程池会怎么办

当线程池中线程执行任务的时候,任务出现未被捕获的异常的情况下,线程池会将允许该任务的线程从池中移除并销毁,且同时会创建一个新的线程加入到线程池中;可以通过ThreadFactory自定义线程并捕获线程内抛出的异常,也就是说甭管我们是否去捕获和处理线程池中工作线程抛出的异常,这个线程都会从线程池中被移除。(详细分析见文章

线程池状态

线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。

如何合理设置线程池队列长度

tomcat、Dubbo 等业界成熟的产品是如何设置线程队列,我们主要分析下这两个中间件

1.JDK线程池策略

corePoolSize < maximumPoolSize时

1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

2. 如果此时线程池中的数量大于等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列

3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理添加的任务。

4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

corePoolSize==maximumPoolSize时

队列未满新的任务会加入队列里,如果队列满了,那么会用空闲线程来处理新的任务。没有空闲线程则执行拒绝策略

线程池策略流程

 

2.Tomcat线程池策略

Tomcat的线程池队列是无限长度的,但是线程池会一直创建到maximumPoolSize,然后才把请求放入等待队列中

tomcat 任务队列org.apache.tomcat.util.threads.TaskQueue其继承与LinkedBlockingQueue,覆写offer方法。

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
         //线程个数小于MaximumPoolSize会创建新的线程。
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

3.Dubbo线程池策略

       Dubbo 提供3种线程池模型即:FixedThreadPool、CachedThreadPool(客户端默认的)、LimitedThreadPool(服务端默认的),从源码可以看出,其默认的队列长度都是0,当队列长度为0 ,其使用是无缓冲的队列SynchronousQueue,当运行线程超过maximumPoolSize则拒绝请求。

总结

线程池的任务队列本来起缓冲作用,但是如果设置的不合理会导致线程池无法扩容至max,这样无法发挥多线程的能力,导致一些服务响应变慢。

      队列长度要看具体使用场景,取决服务端处理能力以及客户端能容忍的超时时间等

      建议采用tomcat的处理方式,core与max一致,先扩容到max再放队列,不过队列长度要根据使用场景设置一个上限值,如果响应时间要求较高的系统可以设置为0。

相关文章:

https://mp.weixin.qq.com/s/2pspUsnFOXDNVfcB_tj22w


相关文章
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
18天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
43 2
|
22天前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
24 1
|
4月前
|
缓存 安全 算法
Java面试题:如何通过JVM参数调整GC行为以优化应用性能?如何使用synchronized和volatile关键字解决并发问题?如何使用ConcurrentHashMap实现线程安全的缓存?
Java面试题:如何通过JVM参数调整GC行为以优化应用性能?如何使用synchronized和volatile关键字解决并发问题?如何使用ConcurrentHashMap实现线程安全的缓存?
45 0
|
1月前
|
安全 Java 开发者
在多线程编程中,确保数据一致性与防止竞态条件至关重要。Java提供了多种线程同步机制
【10月更文挑战第3天】在多线程编程中,确保数据一致性与防止竞态条件至关重要。Java提供了多种线程同步机制,如`synchronized`关键字、`Lock`接口及其实现类(如`ReentrantLock`),还有原子变量(如`AtomicInteger`)。这些工具可以帮助开发者避免数据不一致、死锁和活锁等问题。通过合理选择和使用这些机制,可以有效管理并发,确保程序稳定运行。例如,`synchronized`可确保同一时间只有一个线程访问共享资源;`Lock`提供更灵活的锁定方式;原子变量则利用硬件指令实现无锁操作。
20 2
|
1月前
|
设计模式 Java 物联网
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
58 0
|
3月前
|
缓存 Java
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
这篇文章详细介绍了Java中线程的四种初始化方式,包括继承Thread类、实现Runnable接口、实现Callable接口与FutureTask结合使用,以及使用线程池。同时,还深入探讨了线程池的七大参数及其作用,解释了线程池的运行流程,并列举了四种常见的线程池类型。最后,阐述了在开发中使用线程池的原因,如降低资源消耗、提高响应速度和增强线程的可管理性。
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
|
2月前
|
Java Spring
运行@Async注解的方法的线程池
自定义@Async注解线程池
159 3
|
3月前
|
消息中间件 设计模式 安全
多线程魔法:揭秘一个JVM中如何同时运行多个消费者
【8月更文挑战第22天】在Java虚拟机(JVM)中探索多消费者模式,此模式解耦生产与消费过程,提升系统性能。通过`ExecutorService`和`BlockingQueue`构建含2个生产者及4个消费者的系统,实现实时消息处理。多消费者模式虽增强处理能力,但也引入线程安全与资源竞争等挑战,需谨慎设计以确保高效稳定运行。
93 2
|
3月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
88 7