线程池 ThreadPoolExecutor 分析与实战

简介: 多线程并发执行可以提高程序的性能。特别是在多核处理器的环境下,多线程程序能发挥多核处理器的优势性能。
我是陈皮,一个在互联网 Coding 的 ITer,个人微信公众号「陈皮的JavaLib」关注第一时间阅读最新文章。

线程池简介

多线程并发执行可以提高程序的性能。特别是在多核处理器的环境下,多线程程序能发挥多核处理器的优势性能。

虽然与进程相比,线程轻量化很多,但其创建和关闭同样需要花费时间。而且线程多了以后,也会抢占内存资源。如果不对线程加以管理,是一个很大的隐患。而线程池的目的就是管理线程。当需要一个线程时,可以从线程池拿一个空闲线程去执行任务,当任务执行完后,线程又会归还到线程池。这样就有效的避免了重复创建、关闭线程和线程数量过多带来的问题。

Java 并发包java.util.concurrent提供了线程池功能,以下是一些相关接口,类的关系。

在这里插入图片描述

线程池核心类 ThreadPoolExecutor

ThreadPoolExecutor 是核心类,我们一般使用这个类的对象来提交任务和执行任务。此类有很多构造方法,如下所示。

  • corePoolSize:核心线程数,这些线程即使空闲也会一直存活着,除非对核心线程设置了超时时间。
  • maximumPoolSize:线程池最大线程数,即总的线程数,这其中包括了核心线程。
  • keepAliveTime:当线程池的线程数量超过 corePoolSize 时,超过的非核心空闲线程在被销毁之前等待新任务的最大等待时间,即非核心线程的最大空闲时间。
  • unit:参数 keepAliveTime 的时间单位。
  • workQueue:任务阻塞队列,当核心线程都在执行任务时,新提交的任务就会被放到任务队列中。
  • threadFactory:线程池创建新线程的线程工厂,一般使用默认的即可,除非需要定制化。
  • handler:拒绝策略。所有线程都在执行任务,并且任务队列也满了,对新提交任务的处理方式。
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor 类中的一些变量如下所示。

// 高3位记录线程池状态,低29位记录线程池中线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 对于ctl变量,线程池数量占用比特位32-3=29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池中线程数最大值,00011111 11111111 11111111 11111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

 // 111 运行状态
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 关闭状态,不接受新任务,但是已提交的任务(队列中的任务,正在执行的任务)都会继续处理
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 停止状态,不接收新任务,不处理队列中的任务,中断正在执行的任务
private static final int STOP       =  1 << COUNT_BITS;
// 010 过渡状态,代表线程池即将进入终止状态
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 终止状态,即线程池真正被关闭了
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取线程池状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取线程池中线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

在这里插入图片描述

线程池执行流程

在这里插入图片描述

Executors 工具类

ExecutorService接口提供一些操作线程池的方法,ThreadPoolExecutor 类是 ExecutorService 接口的实现类。Executors相当于一个线程池工厂类,生产ExecutorService实例(其实是 ThreadPoolExecutor 实例),它里面有几种现成的具备某种特定功能的线程池工厂方法,如下所示。

// 创建一个线程数量为10的固定线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

Executors工厂方法介绍:

  • newSingleThreadExecutor():只有一个线程的线程池。超出的任务被放到任务队列,等这个线程空闲时就会去按顺序处理。
  • newFixedThreadPool():固定线程数量线程池。传入的数字就是线程的数量,如果有空闲线程就去执行任务,如果没有空闲线程就会把任务放到一个任务队列,等到有线程空闲时再任务。
  • newCachedThreadPool():可拓展的线程池。当没有空闲线程去执行新任务时,就会再创建新的线程去执行任务,执行完后新建的线程会返回线程池进行复用。
  • newSingleThreadScheduledExecutor():返回 ScheduledExecutorService 对象。ScheduledExecutorService 接口继承 ExecutorService 接口,有一些拓展方法,如指定执行时间。这个线程池大小为1,在指定时间执行任务。关于指定时间的几个方法:schedule() 是在指定时间后执行一次任务。 scheduleAtFixedRate() 和 scheduleWithFixedDelay() 方法,两者都是周期性的执行任务,但是前者是以上一次任务开始为周期起点,后者是以上一次任务结束为周期起点。
  • newScheduledThreadPool():和上面一个方法一样,但是可以指定线程池大小,其实上面那个方法也是调用这个方法的,只是传入的参数是1。

注意,阿里巴巴规范中不推荐使用 Executors 工具类来创建线程池,因为这种方式对线程池的控制粒度比较粗,创建线程池的大多参数都不是我们控制的。

  • newFixedThreadPool:创建一个固定线程数量的线程池,阻塞队列使用的是无界任务队列。如果提交的任务量巨多的话会导致阻塞队列一直增长,占用大量内存空间,严重时会造成内存溢出。主要有以下几个问题。
  • newCachedThreadPool:创建一个无上限线程数量的线程池,提交一个任务如果没有空闲线程,就创建一个线程来执行任务。如果提交的任务量巨多的话会导致创建太多的线程,会导致 CPU 100% 。

任务队列

线程池中的任务队列是一个BlockingQueue接口,在 ThreadPoolExecutor 类中有如下几种实现类实现了 BlockingQueue 接口。
  

  • LinkedBlockingQueue:无界任务队列,是个链表结构,不会出现任务队列满了的情况,除非内存空间不足,但是非常耗费系统资源。和有界任务队列一样,线程数若小于 corePoolSize ,新任务进来时没有空闲线程的话就会创建新线程,当达到 corePoolSize 时,就会进入任务队列。其实 maximumPoolSize 没什么作用,newFixedThreadPool 固定大小线程池就是用的这个任务队列,它的 corePoolSize 和 maximumPoolSize 相等。
  • SynchronousQueue:直接提交队列。这种队列其实不会真正的去保存任务,每提交一个任务就直接让空闲线程执行,如果没有空闲线程就去新建,当达到最大线程数时,就会执行拒绝策略。所以使用这种任务队列时,一般会设置很大的maximumPoolSize,不然很容易就执行了拒绝策略。 newCachedThreadPool 线程池的 corePoolSize 为0,maximumPoolSize 无限大,它用的就是直接提交队列。
  • ArrayBlockingQueue:有界任务队列,其构造函数必须带一个容量参数,表示任务队列的大小。当线程数量小于 corePoolSize 时,有任务进来优先创建线程。当线程数等于 corePoolSize 时,新任务就会进入任务队列,当任务队列满了,才会创建新线程,线程数达到 maximumPoolSize 时执行拒绝策略
  • PriorityBlockingQueue:优先任务队列,它是一个特殊的无界队列,因为它总能保证高优先级的任务先执行。

拒绝策略

JDK 提供了四种拒绝策略,都实现了RejectedExecutionHandler接口,如果这四种拒绝策略无法满足你的要求,可以自定义继承RejectedExecutionHandler 并实现 rejectedExecution 方法。

  • AbortPolicy:直接抛出异常,阻止系统正常工作。JDK 默认是这种策略。
  • CallerRunsPolicy:如果线程池未关闭,则在调用者线程里面执行被丢弃的任务,这个策略不是真正的拒绝任务。比如我们在 T1 线程中提交的任务,那么该拒绝策略就会把多余的任务放到 T1 线程执行,会影响到提交者线程的性能。
  • DiscardOldestPolicy:该策略会丢弃一个最老的任务,也就是即将被执行的任务,然后再次尝试提交该任务。
  • DiscardPolicy:直接丢弃当前提交的任务,不做任何处理,如果允许丢弃任务,这个策略是最好的。

线程工厂

线程池中的线程是由`ThreadFactory负责创建的,一般情况下默认就行,如果有一些其他的需求,比如自定义线程的名称、优先级等,则可实现 ThreadFactory 接口来自定义自己的线程工厂。

public interface ThreadFactory {

    Thread newThread(Runnable r);
}

以下是 Executors 类中定义的默认线程工厂类。

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

线程池扩展

ThreadPoolExecutor 类中有三个扩展方法:

  • beforeExecute:在任务执行前执行。
  • Execute:任务执行后执行。
  • terminated:线程池退出时执行。

ThreadPoolExecutor 类中有一个内部类:`Worker,每个线程的任务其实都是由这个类里面的 run 方法执行的。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    // ...省略

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // ...省略
 
}

runWorker 方法:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任务执行前执行该方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后执行该方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

还有一个线程池退出时执行的方法是在何处执行的?这个方法被调用的地方就不止一处了,像线程池的 shutdown 方法就会调用,例如 ThreadPoolExecutor 类的 shutdown 方法如下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 线程池退出时执行
    tryTerminate();
}

ThreadPoolExecutor 中这三个方法默认是没有任何内容的:

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

protected void terminated() { }

我们也可以自定义并重写他们,例如继承 ThreadPoolExecutor 并重写这三个方法:

ExecutorService threadpool = new ThreadPoolExecutor(3, 10, 3L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(2)) {
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // 执行任务前
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 执行任务后
    }

    @Override
    protected void terminated() {
        // 线程退出
    }
};

线程池工具类

最后给出一个自己封装,开箱即用的线程池工具类。

package com.chenpi;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author 陈皮
 * @version 1.0
 * @description 线程池工具类
 * @date 2020/06/02
 */
public class ThreadPoolUtils {

  // 核心池大小
  private static final int CORE_POOL_SIZE = 5;
  // 线程池允许的最大线程数
  private static final int MAXIMUM_POOL_SIZE = 10;
  // 空闲的多余线程最大存活时间,单位秒
  private static final int KEEP_ALIVE_TIME = 3;
  // 任务阻塞队列大小
  private static final int QUEUE_SIZE = 3;
  // 默认线程池名称
  private static final String DEFAULT_THREAD_POOL_NAME = "DEFAULT";

  // 用于保存不同的线程池
  private static final Map<String, ThreadPoolExecutor> executorList = new ConcurrentHashMap<>();

  public static ThreadPoolExecutor getExecutor(String executorName) {
    ThreadPoolExecutor executor = executorList.get(executorName);
    if (executor == null) {
      synchronized (ThreadPoolUtils.class) {
        if (executor == null) {
          executor = create(executorName);
        }
      }
    }
    return executor;
  }

  // 使用特定线程池
  public static void execute(String executorName, Runnable command) {
    getExecutor(executorName).execute(command);
  }

  // 使用默认线程池
  public static void execute(Runnable command) {
    getExecutor(DEFAULT_THREAD_POOL_NAME).execute(command);
  }

  // 使用特定线程池
  public static <T> Future<T> submit(String executorName, Callable<T> command) {
    return getExecutor(executorName).submit(command);
  }

  // 使用默认线程池
  public static <T> Future<T> submit(Callable<T> command) {
    return getExecutor(DEFAULT_THREAD_POOL_NAME).submit(command);
  }

  public static void shutdown(String executorName) {
    if (null == executorName || executorName.length() == 0) {
      executorName = DEFAULT_THREAD_POOL_NAME;
    }
    getExecutor(executorName).shutdown();
  }

  // 如果executorList中没有指定名称的线程池,则进行创建
  private static ThreadPoolExecutor create(String executorName) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
        KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE),
        new ThreadPoolExecutor.CallerRunsPolicy());
    executorList.put(executorName, executor);
    return executor;
  }
}

以下为调用示例。

package com.chenpi;

import java.util.concurrent.TimeUnit;

/**
 * @author 陈皮
 * @version 1.0
 * @description
 * @date 2022/3/14
 */
public class ChenPi {

  public static void main(String[] args) {
    for (int i = 0; i < 20; i++) {
      final int fi = i;
      ThreadPoolUtils.execute(() -> {
        // 自定义当前线程未捕获异常处理
        Thread.currentThread().setUncaughtExceptionHandler(
            (t, e) -> System.out.println(t.getName() + ",Throwable:" + e));
        try {
          TimeUnit.SECONDS.sleep((long) (Math.random() * 5));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 执行完任务" + fi);
      });
    }

    // 关闭线程池
    ThreadPoolUtils.shutdown(null);
  }
}

输出结果如下。

pool-1-thread-2 执行完任务1
pool-1-thread-10 执行完任务12
pool-1-thread-6 执行完任务8
pool-1-thread-10 执行完任务6
main 执行完任务13
pool-1-thread-3 执行完任务2
pool-1-thread-4 执行完任务3
pool-1-thread-10 执行完任务14
pool-1-thread-1 执行完任务0
pool-1-thread-8 执行完任务10
pool-1-thread-9 执行完任务11
pool-1-thread-7 执行完任务9
pool-1-thread-5 执行完任务4
pool-1-thread-2 执行完任务5
pool-1-thread-6 执行完任务7
pool-1-thread-3 执行完任务15
main 执行完任务17
pool-1-thread-4 执行完任务16
pool-1-thread-10 执行完任务18
pool-1-thread-1 执行完任务19

本次分享到此结束啦~~

如果觉得文章对你有帮助,点赞、收藏、关注、评论,您的支持就是我创作最大的动力!

相关文章
|
26天前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
30 1
|
1月前
线程CPU异常定位分析
【10月更文挑战第3天】 开发过程中会出现一些CPU异常升高的问题,想要定位到具体的位置就需要一系列的分析,记录一些分析手段。
61 0
|
3月前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
Java并发编程实战:使用synchronized关键字实现线程安全
57 0
|
1月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
30 3
|
1月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
26 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
2月前
|
Java
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
35 7
|
2月前
|
Java Android开发 UED
🧠Android多线程与异步编程实战!告别卡顿,让应用响应如丝般顺滑!🧵
在Android开发中,为应对复杂应用场景和繁重计算任务,多线程与异步编程成为保证UI流畅性的关键。本文将介绍Android中的多线程基础,包括Thread、Handler、Looper、AsyncTask及ExecutorService等,并通过示例代码展示其实用性。AsyncTask适用于简单后台操作,而ExecutorService则能更好地管理复杂并发任务。合理运用这些技术,可显著提升应用性能和用户体验,避免内存泄漏和线程安全问题,确保UI更新顺畅。
91 5
|
2月前
|
Java 开发者
Java中的多线程编程基础与实战
【9月更文挑战第6天】本文将通过深入浅出的方式,带领读者了解并掌握Java中的多线程编程。我们将从基础概念出发,逐步深入到代码实践,最后探讨多线程在实际应用中的优势和注意事项。无论你是初学者还是有一定经验的开发者,这篇文章都能让你对Java多线程有更全面的认识。
30 1
|
2月前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
44 0
|
2月前
|
安全 Java 调度
python3多线程实战(python3经典编程案例)
该文章提供了Python3中多线程的应用实例,展示了如何利用Python的threading模块来创建和管理线程,以实现并发执行任务。
38 0