Java并发 之 线程池系列 (2) 使用ThreadPoolExecutor构造线程池

简介: Java并发 之 线程池系列 (2) 使用ThreadPoolExecutor构造线程池

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8ciPBJfb-1676540594595)(null)]

Executors的“罪与罚”

在上一篇文章Java并发 之 线程池系列 (1) 让多线程不再坑爹的线程池中,我们介绍了使用JDK concurrent包下的工厂和工具类Executors来创建线程池常用的几种方法:

//创建固定线程数量的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

//创建一个线程池,该线程池会根据需要创建新的线程,但如果之前创建的线程可以使用,会重用之前创建的线程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

//创建一个只有一个线程的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

诚然,这种创建线程池的方法非常简单和方便。但仔细阅读源码,却把我吓了一条: 这是要老子的命啊!

我们前面讲过,如果有新的请求过来,在线程池中会创建新的线程处理这些任务,一直创建到线程池的最大容量(Max Size)为止;超出线程池的最大容量的Tasks,会被放入阻塞队列(Blocking
Queue)进行等待,知道有线程资源释放出来为止;要知道的是,阻塞队列也是有最大容量的,多余队列最大容量的请求不光没有获得执行的机会,连排队的资格都没有!

那这些连排队的资格都没有的Tasks怎么处理呢?不要急,后面在介绍ThreadPoolExecutor的拒绝处理策略(Handler Policies for Rejected Task)的时候会详细介绍。

说到这里你也许有写疑惑了,上面这些东西,我通常使用Executors的时候没有指定过啊。是的,因为Executors很“聪明”地帮我们做了这些事情。

Executors的源码

我们看下ExecutorsnewFixedThreadPoolnewSingleThreadExecutor方法的源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(
         1, 1,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>()));
 }

其实它们底层还是通过ThreadPoolExecutor来创建ExecutorService的,这里对妻子的参数先不作介绍,下面会详细讲,这里只说一下 new LinkedBlockingQueue<Runnable>()这个参数。

LinkedBlockingQueue就是当任务数大于线程池的线程数的时候的阻塞队列,这里使用的是无参构造,我们再看一下构造函数:

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

我们看到阻塞队列的默认大小竟然是Integer.MAX_VALUE

如果不做控制,拼命地往阻塞队列里放Task,分分钟“Out of Memory”啊!

还有更绝的,newCachedThreadPool方法:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
    0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
}

最大线程数默认也是Integer.MAX_VALUE,也就是说,如果之前的任务没有执行完就有新的任务进来了,就会继续创建新的线程,指导创建到Integer.MAX_VALUE为止。

让你的JVM OutOfMemoryError

下面提供一个使用newCachedThreadPool创建大量线程处理Tasks,最终OutOfMemoryError的例子。

友情提醒:场面过于血腥,请勿在生产环境使用。

package net.ijiangtao.tech.concurrent.jsd.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample2 {

    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(1000 * 600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void newCachedThreadPoolTesterBadly() {
        System.out.println("begin............");
        for (int i = 0; i <= Integer.MAX_VALUE; i++) {
            executorService.execute(new Task());
        }
        System.out.println("end.");
    }

    public static void main(String[] args) {
        newCachedThreadPoolTesterBadly();
    }

}

main方法启动以后,打开控制面板,看到CPU和内存几乎已经全部耗尽:

很快控制台就抛出了java.lang.OutOfMemoryError

begin............
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:717)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
    at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24)
    at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30)

阿里巴巴Java开发手册

下面我们在看Java开发手册这条规定,应该就明白作者的良苦用心了吧。

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:
Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

主角出场

解铃还须系铃人,其实避免这个OutOfMemoryError风险的钥匙就藏在Executors的源码里,那就是自己直接使用ThreadPoolExecutor

ThreadPoolExecutor的构造

构造一个ThreadPoolExecutor需要蛮多参数的。下面是ThreadPoolExecutor的构造函数。

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

下面就一一介绍一下这些参数的具体含义。

ThreadPoolExecutor构造参数说明

其实从源码中的JavaDoc已经可以很清晰地明白这些参数的含义了,下面照顾懒得看英文的同学,再解释一下:

  • corePoolSize

线程池核心线程数。

默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制,除非将allowCoreThreadTimeOut设置为true

  • maximumPoolSize

线程池所能容纳的最大线程数。超过maximumPoolSize的线程将被阻塞。

最大线程数maximumPoolSize不能小于corePoolSize

  • keepAliveTime

非核心线程的闲置超时时间。

超过这个时间非核心线程就会被回收。

  • TimeUnit

keepAliveTime的时间单位,如TimeUnit.SECONDS。

当将allowCoreThreadTimeOut为true时,对corePoolSize生效。

  • workQueue

线程池中的任务队列。

没有获得线程资源的任务将会被放入workQueue,等待线程资源被释放。如果放入workQueue的任务数大于workQueue的容量,将由RejectedExecutionHandler的拒绝策略进行处理。

常用的有三种队列:
SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue

  • threadFactory

提供创建新线程功能的线程工厂。

ThreadFactory是一个接口,只有一个newThread方法:

Thread newThread(Runnable r);
  • rejectedExecutionHandler

无法被线程池处理的任务的处理器。

一般是因为任务数超出了workQueue的容量。

当一个任务被加入线程池时

总结一下,当一个任务通过execute(Runnable)方法添加到线程池时:

  1. 如果此时线程池中线程的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  2. 如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。
  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的拒绝策略来处理此任务。

处理任务的优先级为:核心线程数(corePoolSize) > 任务队列容量(workQueue) > 最大线程数(maximumPoolSize);如果三者都满了,使用rejectedExecutionHandler处理被拒绝的任务。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XvpfYpx2-1676540594561)(null)]

ThreadPoolExecutor的使用

下面就通过一个简单的例子,使用ThreadPoolExecutor构造的线程池执行任务。

ThreadPoolExample3

package net.ijiangtao.tech.concurrent.jsd.threadpool;

import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ijiangtao.net
 */
public class ThreadPoolExample3 {

    private static final AtomicInteger threadNumber = new AtomicInteger(1);

    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.currentThread().sleep(2000);
                System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    private static class MyThreadFactory implements ThreadFactory {

        private final String namePrefix;

        public MyThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }

        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, namePrefix + "-" + threadNumber.getAndIncrement());
        }

    }

    private static final ExecutorService executorService = new ThreadPoolExecutor(
            10,
            20, 30, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(50),
            new MyThreadFactory("MyThreadFromPool"),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {

        // creates five tasks
        Task r1 = new Task();
        Task r2 = new Task();
        Task r3 = new Task();
        Task r4 = new Task();
        Task r5 = new Task();

        // submit方法有返回值
        Future future = executorService.submit(r1);
        System.out.println("r1 isDone ? " + future.isDone());

        // execute方法没有返回值
        executorService.execute(r2);
        executorService.execute(r3);
        executorService.execute(r4);
        executorService.execute(r5);

        //关闭线程池
        executorService.shutdown();

    }

}

执行结果

r1 isDone ? false
MyThreadFromPool-2-21:04:03.215
MyThreadFromPool-5-21:04:03.215
MyThreadFromPool-4-21:04:03.215
MyThreadFromPool-3-21:04:03.215
MyThreadFromPool-1-21:04:03.215

从结果看,从线程池取出了5个线程,并发执行了5个任务。

总结

这一章我们介绍了一种更安全、更定制化的线程池构建方式:ThreadPoolExecutor。相信你以后不敢轻易使用Executors来构造线程池了。

后面我们会介绍线程池的更多实现方式(例如使用Google核心库Guava),以及关于线程池的更多知识和实战。

Links

作者资源

相关资源

目录
相关文章
|
30天前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
9天前
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
1月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
109 17
|
2月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
2月前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
3月前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
3天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
36 14
|
6天前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
34 13
|
7天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。