【小家java】用 ThreadPoolExecutor/ThreadPoolTaskExecutor 线程池技术提高系统吞吐量(附带线程池参数详解和使用注意事项)(下)

简介: 【小家java】用 ThreadPoolExecutor/ThreadPoolTaskExecutor 线程池技术提高系统吞吐量(附带线程池参数详解和使用注意事项)(下)
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }


Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池

Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池


ThreadPoolExecutor 的继承关系如下


Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor


其中有一个构造方法:


public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           RejectedExecutionHandler handler) {
     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
          Executors.defaultThreadFactory(), handler);
 }
 发现我们可以指定workQueue和handler。当然还有其余的构造函数,有类似的效果


线程池构造函数7大参数解释:

  corePoolSize:核心线程数。


  maximumPoolSize:最大线程数。表明线程中最多能够创建的线程数量。


  keepAliveTime:空闲的线程保留的时间。


  unit:空闲线程的保留时间单位。

  BlockingQueue workQueue:用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。


1、ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。


2、LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列


3、SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。


4、PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等

handler:饱和策略处理器。默认提供的4中策略上面已经有解释了


强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。



但是,但是,但是。。。用fix是有坑的。详情请见我这篇博文(在生产环境的一个活生生的血案):


另外,此处我说一下ThreadPoolTaskExecutor:


ThreadPoolTaskExecutor是一个spring的线程池技术,其实,它的实现方式完全是使用ThreadPoolExecutor进行实现(有点类似于装饰者模式。当然Spring提供的功能更加强大些,因为还有定时调度功能)。


三、如何设置线程池的参数:


系统默认值

corePoolSize=1

queueCapacity=Integer.MAX_VALUE

maxPoolSize=Integer.MAX_VALUE

keepAliveTime=60s

allowCoreThreadTimeout=false

rejectedExecutionHandler=AbortPolicy()


那我们如何来设置呢?需要根据几个值来决定


tasks :每秒的任务数,假设为500~1000

taskcost:每个任务花费时间,假设为0.1s

responsetime:系统允许容忍的最大响应时间,假设为1s

做几个计算

corePoolSize = 每秒需要多少个线程处理?


threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50

根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可

queueCapacity = (coreSizePool/taskcost)*responsetime

计算可得 queueCapacity = 80/1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行

切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。

maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)

计算可得 maxPoolSize = (1000-80)/10 = 92

(最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数


rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理


keepAliveTime和allowCoreThreadTimeout采用默认通常能满足

以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。


JDK1.5 的线程池由 Executor 框架提供。 Executor 框架将处理请求任务的提交和它的执行解耦。可以制定执行策略。在线程池中执行线程可以重用已经存在的线程,而不是创建新的线程,可以在处理多请求时抵消线程创建、消亡产生的开销。如果线程池过大,会导致内存的高使用量,还可能耗尽资源。如果过小,会由于存在很多的处理器资源未工作,对吞吐量造成损失。


如何合理配置线程池大小,一般需要根据任务的类型来配置线程池大小:

 1、如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1(比如是4核心 就配置为5)

 2、如果是IO密集型任务,参考值可以设置为2*NCPU

 当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。


3、使用场景



1、当你的任务是非必要的时候。比如记录操作日志、通知第三方服务非必要信息等,可以使用线程池处理非阻塞任务

2、当你的任务非常耗时时候,可以采用线程池技术

3、当请求并发很高时,可以采用线程池技术优化处理


可以通过Executors静态工厂构建线程池,但一般不建议这样使用。


提醒:能够用线程池的时候,不要自己的去new线程start,在高并发环境下,系统资源是宝贵的,需要节约资源才能提高可用性。


小彩蛋

Executors工具类给我们提供了不少快捷创建线程池的方法,虽然我们不推荐使用。但是里面有个方法我觉得还是不错的:Executors.newSingleThreadExecutor() 如果把这个当作全局线程池,可以很好实现异步,并且还能保证任务的顺序执行,进而达到消峰的效果:

        private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private static AtomicInteger num = new AtomicInteger();
    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            executorService.execute(() -> {
                num.getAndIncrement();
                System.out.println(Thread.currentThread().getName() + "-->>>>" + num);
            });
        }
        executorService.shutdown();
    }
输出:
pool-1-thread-1-->>>>1
pool-1-thread-1-->>>>2
pool-1-thread-1-->>>>3
pool-1-thread-1-->>>>4
pool-1-thread-1-->>>>5
pool-1-thread-1-->>>>6
pool-1-thread-1-->>>>7
pool-1-thread-1-->>>>8
pool-1-thread-1-->>>>9
pool-1-thread-1-->>>>10
pool-1-thread-1-->>>>11
pool-1-thread-1-->>>>12
pool-1-thread-1-->>>>13
pool-1-thread-1-->>>>14
pool-1-thread-1-->>>>15
pool-1-thread-1-->>>>16
pool-1-thread-1-->>>>17
pool-1-thread-1-->>>>18
pool-1-thread-1-->>>>19
pool-1-thread-1-->>>>20


我们发现pool只有一个,且thread也只有一个,而且任务都是顺序执行的。当我们用Fiexed的话,也是能够达到顺序执行的效果的。因为内部的阻塞队列是FIFO的实现:


    private static final ExecutorService executorService = Executors.newFixedThreadPool(5);
    private static AtomicInteger num = new AtomicInteger();
    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "-->>>>" + num.getAndIncrement());
            });
        }
        executorService.shutdown();
    }
输出:
pool-1-thread-2-->>>>0
pool-1-thread-1-->>>>1
pool-1-thread-2-->>>>2
pool-1-thread-1-->>>>3
pool-1-thread-2-->>>>4
pool-1-thread-1-->>>>5
pool-1-thread-2-->>>>6
pool-1-thread-1-->>>>7
pool-1-thread-2-->>>>8
pool-1-thread-1-->>>>9
pool-1-thread-2-->>>>10
pool-1-thread-1-->>>>11
pool-1-thread-2-->>>>12
pool-1-thread-1-->>>>13
pool-1-thread-2-->>>>14
pool-1-thread-1-->>>>15
pool-1-thread-2-->>>>16
pool-1-thread-4-->>>>17
pool-1-thread-3-->>>>18
pool-1-thread-5-->>>>19
相关文章
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
120 38
|
2月前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
64 4
|
2月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
101 2
|
2月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
353 2
|
13天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
38 1
|
3月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
63 1
|
3月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
41 3
|
3月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
28 2
|
3月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
45 2
|
3月前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
52 1