java并发编程笔记--ThreadPoolExecutor实现

简介:     ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里的对象可以理解为某种"资源",比如:数据库连接、线程、socket连接...创建这种资源的消耗比较大,如果每次使用都新建的话,会造成额外的开销。

    ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销提高性能的目的。这里的对象可以理解为某种"资源",比如:数据库连接、线程、socket连接...创建这种资源的消耗比较大,如果每次使用都新建的话,会造成额外的开销。同时因为对资源的创建没有限制,可能会重复创建大量的对象,导致资源枯竭。如果能够提前创建好一批对象放到一个"池"中,每次使用时都从池中选取空闲的对象,用完后再还入池中,这样便提高了资源的使用效率,又因为限制了创建对象的数量,也可以避免大量创建对象导致资源枯竭。我们常用的线程池、DB连接池、socket连接池等都是用的这个思路。

适合使用线程池的场景是建立在如下基础上的:
0)串行执行的响应性和吞吐量无法满足需求:串行程序会在一个线程中逐处理所有任务,因此可能受任务执行时间的长度影响,具有较差的响应性。同时,因为单线程程序往往不能充分利用CPU的资源,无法达到最优的吞吐量。
1)线程的生命周期管理(时间)开销很高:因为创建、销毁线程开销很大,所以频繁的创建线程会造成较大的时间开销,从而减慢任务的处理速率,增加延迟。
2)物理资源的消耗:活跃的线程会消耗操作系统资源,包括:内存、文件句柄等。如果应用程序中存在大量活跃的线程,将会占用大量的资源,有可能导致系统资源枯竭。
3)稳定性:随意的创建线程而不加管控,不利于整个应用程序的稳定。
综上所述,使用多线程提升并发应该是使用合适的线程数达到最好的性能即可,并非线程越多越好。一些场景下(如任务规模很小或者资源需要串行访问的场景),单线程处理可能比多线程更简单且性能更好。

线程池的组成

抛开ThreadPoolExecutor的实现,我们先自己设想一下线程池的概念模型:

image

    首先我们需要一个用于全局管控线程池运行的类,负责线程池的创建、任务的运行以及最后线程池关闭...完整流程的管理,这个类构成了线程池的主体,即线程管理类。线程池中包含多个用于执行任务的工作线程,工作线程由线程管理类创建和维护,能够执行外部提交的任务,并响应取消任务的操作。

    由于不同的业务场景创建的线程具有不同的特征,比如:以相同的前缀命名,设置为守护线程,归属于相同的ThreadGroup等...这些参数的配置对于线程池而言是重复性工作,因此单独抽取一个线程工厂类用作工作线程的创建,由于不同的场景设置的线程参数不同,因而需要客户端自己根据需求定制,故抽象为接口。

    线程池中使用阻塞队列作为任务提交和任务执行的"缓冲",当工作线程忙不过来时,线程池可以先将任务存放到一个任务队列中,待后续有工作线程空闲时,再从队列中获取新的任务执行。由于有了任务队列这个"缓冲",便增加了执行任务的灵活度,任务队列可以根据不同的策略对其中的任务进行编排,比如按优先级执行、延迟执行...存在这灵活性,就意味着要为客户端留有扩展的余地,因而任务队列也抽象为接口。

    线程池作为生产者-消费者模式的实现,也要考虑生产速率和消费速率的平衡。当生产-消费失衡时,即任务的提交速率大于消费速率时,为了保障程序的稳定性,需要对任务做驳回处理,以便降低线程池的负载。同时,为了减少驳回任务造成的损失,需要对被驳回的任务提供必要的驳回处理策略。最简单快速且合理的驳回策略便是抛出异常,即我们在服务降级中常用的快速失败策略,为了避免处理驳回任务带来额外的负载压力,直接放弃任务执行,通过抛出异常告知客户端任务失败。当然,为了满足各式各样的业务场景,也需要留给客户端定制策略的余地,因此驳回策略也被抽象为接口。

    线程池管理类、工作线程、任务队列、线程初始化类、任务驳回策略构成了一个线程池的基本组成。有了基本思路,下面对照着看看ThreadPoolExecutor的组成。

image

ThreadPoolExecutor,线程池管理类,继承AbstractExecutorService类,作为ExecutorService的一种实现,通过线程池调度任务;

Worker,工作线程类,属于ThreadPoolExecutor的私有内部类,实现Runnable接口,用作运行线程任务;继承AbstractQueuedSynchronizer接口,用作控制工作线程的空闲/运行状态的同转换;

ThreadFactory,线程工厂接口,实现该接口,定制线程创建参数;

BlockingQueue,阻塞队列接口,用于缓存客户端提交的任务,传入不同的实现类,实现不同的任务编排策略;

RejectedExecutionHandler,驳回处理策略接口,定制驳回任务的处理策略,ThreadPoolExecutor默认提供4种实现:

  • CallerRunsPolicy:使用提交任务的线程处理被驳回任务;
  • AbortPolicy::中止驳回任务,抛出RejectedExecutionException异常;
  • DiscardPolicy:忽略驳回任务,不做任何处理;
  • DiscardOldestPolicy:忽略阻塞队列header元素,然后将任务提交ThreadPoolExecutor重新执行,如果失败,则继续忽略header元素;

线程池的生命周期

线程池的生命周期,即线程池从创建、使用到最终销毁所经历的过程。ThreadPoolExecutor将线程池的生命周期划分为5个状态,分别是:running、shutdown、stop、tidying、terminated。状态图如下:

image

  • running:线程池创建后的状态,能够接收新任务,同时处理任务队列中的任务;
  • shutdown:执行shutdown()后,不接收新任务,但是会处理任务队列中的任务;
  • stop:执行shutdownNow()后,不接收新任务,也不处理任务队列中的任务,同时会中断正在执行的任务;
  • tidying:当所有任务都已被终止,工作线程数为0后,状态变为tidying状态,然后运行terminated()钩子方法;
  • terminated:当terminated()钩子方法执行完毕后,变为此状态。此时ThreadPoolExecutor已被完全终止;

    ThreadPoolExecutor将线程池状态runState和工作线程数workCount放入同一个字段。其中,1~29位存放workCount;30~32位存放runState,包括1位符号位。

// 使用一个原子整型存放runState和workCount两个字段;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// workCount的最大位数,此处为32位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大线程数:2^29-1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 存放runState的5个状态常量 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取runState值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取workCount值
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 封装指定的runState和workCount到ctl字段中
private static int ctlOf(int runState, int workCount) { return runState | workCount; }

/**
 * 控制runState字段
 */
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/**
 * 通过CAS方式增加/减少workerCount
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

线程池的初始化

    ThreadPoolExecutor初始化至少需要指定4个参数:corePoolSize(核心工作线程数),maximumPoolSize(最大工作线程数),keepAliveTime(空闲线程存活时间),workQueue(任务队列);除此之外,可以选定2个参数:threadFactory(线程工厂,默认使用DefaultThreadFactory类),handler(驳回任务处理器,默认使用AbortPolicy类)。根据填写参数的不同,提供了4个构造函数,如下是最终的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
        
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
        
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

提交任务

image

    ThreadPoolExecutor继承了AbstractExecutorService抽象类。AbstractExecutorService实现了ExecutorService接口中提交任务的基本方法,包括:submit和invoke*等方法,用于提交任务,但没有实现execute方法。

    submit包含3个重载方法,均是将Runnable、Callable对象包装成RunnableFuture对象,然后调用execute方法,可见执行任务的具体方式仍是依赖子类实现,有一些模板方法模式的味道。

image

    invokeAll方法用于批量执行多个任务,并同步等待任务执行完成返回结果。invokeAny方法用于执行一批任务中的一个任务,并同步等待,直到有任意一个任务执行完成,便取消其它任务,并返回。invokeAny方法依赖于ExecutorCompletionService实现,ExecutorCompletionService作为CompletionService接口的实现类,可以理解为一个Executor对象+一个BlockingQueue,Executor用于执行提交的任务,可由调用方依赖注入,此处传入AbstractExecutorService本身,即AbstractExecutorService的实现类本身。BlockingQueue用于存放执行完成的任务。invokeAny方法调用doInvokeAny方法,doInvokeAny方法调用ExecutorCompletionService的submit方法,提交所有待执行的任务,然后调用ExecutorCompletionService的take方法,阻塞等待ExecutorCompletionService的BlockingQueue中有完成任务的Future放入。待收到第一个完成的任务结果后,调用Future的cancel方法取消其它还在执行的任务,随即返回。

    还有一个newTaskFor方法,标志为protected,用于将Runnable、Callable接口封装成RunnableFuture对象,AbstractExecutorService的子类可根据包装Future对象的不同重写。

执行任务

所有Executor接口的实现类,均以execute方法为执行入口。ThreadPoolExecutor的execute方法执行如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
        
    // step1. 如果poolSize < corePoolSize,则新建线程;
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // step2. 如果线程池状态为running,且poolSize >= corePoolSize,则放入任务队列;
    // 如果线程池状态不是running,则驳回任务,执行驳回策略;
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // step3. 如果放入任务队列失败,则创建一个新线程执行任务;
    // 如果创建线程失败,则驳回任务,执行驳回策略;
    else if (!addWorker(command, false))
        reject(command);
}

从代码可以看出,线程池执行任务主要分为3步:

  • 第一步:如果poolSize < corePoolSize,则新建线程;
  • 第二步:如果线程池状态为running,且poolSize >= corePoolSize,则放入任务队列;如果线程池状态不是running,则驳回任务,执行驳回策略;
  • 第三步:如果放入任务队列失败,则创建一个新线程执行任务; 如果创建线程失败,则驳回任务,执行驳回策略;

execute方法中调用addWorker方法用于创建一个工作线程,传入当前任务并启动;来看addWorker实现:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 通过CAS操作使workerCount++
    ... ... 
    
    // 标识工作线程是否正常启动
    boolean workerStarted = false;
    // 标识工作线程是否添加成功
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建工作线程对象,传入任务,调用ThreadFactory创建线程;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 加锁,保证如下操作的原子性:
            // 1)添加工作线程到线程集合;
            // 2)更新线程池运行数据;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            
            // 启动工作线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,则终止线程池运行,尝试调用ThreadPoolExecutor提供的terninated()回调函数;
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

创建Worker线程后,执行t.start()方法会启动工作线程,工作线程的run方法调用ThreadPoolExecutor的runWorker方法。该方法用于执行线程任务,期间会调用ThreadPoolExecutor提供的beforeExecute和afterExecute回调函数,这两个函数由ThreadPoolExecutor的子类实现。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    // 标识任务是否未被完整执行,true标识未被完整执行;
    boolean completedAbruptly = true;
    try {
        // getTask阻塞等待任务,当线程池停止或者获取任务超时,则返回null;循环退出条件如下:
        // 1)因为调用setMaximumPoolSize导致poolSize大于setMaximumPoolSize,需要回收掉部分工作线程;
        // 2)线程池被stop;
        // 3)线程池被shutdown并且任务队列为空;
        // 4)线程池获取任务超时,即超过keepAliveTimeout时限,需要被回收时;
        while (task != null || (task = getTask()) != null) {
            // 标识当前工作线程为已使用状态
            w.lock();
            // 检查当前线程是否因为执行shutdownNow方法而被中断;
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); // 在当前工作线程中执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 更新统计数据
                w.completedTasks++;
                // 标识工作线程为未使用状态
                w.unlock();
            }
        }
        // 如果执行到此处,表明任务被完整执行
        completedAbruptly = false;
    } finally {
        // 此方法调用场景:
        // 1)终止线程池时,用于从线程集合中移除工作线程;
        // 2)当前线程因为异常而退出,重新创建线程替换之;
        // 3)线程池中因为设置allowCoreThreadTimeOut=true,导致工作线程全部被回收时,任务队列仍然有任务,则新建线程;
        processWorkerExit(w, completedAbruptly);
    }
}

关闭线程池

关闭线程池有2个入口:shutdown、shutdownNow。shutdown是相对"温和"的关闭方式,它不接受新提交任务,中断所有空闲线程,但不会干预正在运行的线程,以及已经提交的任务。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查权限
        checkShutdownAccess();
        // 设置线程池状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断所有等待任务的空闲线程
        interruptIdleWorkers();
        // 调用回调函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    
    // 执行线程池终止操作
    tryTerminate();
}

shutdownNow是相对"激进"的关闭方法,它会尝试中断所有工作线程,同时干预正在运行的任务中止,清空任务队列,并将任务队列中未执行的任务返回给调用方。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查权限
        checkShutdownAccess();
        // 设置线程池状态为STOP
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 导出未执行的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    
    // 执行终止操作
    tryTerminate();
    return tasks;
}

shutdown和shutdownNow并不会同步等待线程池终止,而是提前异步返回,如果需要在调用shutdown或者shutdownNow后,同步等待线程池终止,则可以再调用awaitTermination方法。该方法会等到线程池状态变为terminated状态或者超时后返回。

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            // 状态变为terminated,返回true
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            // 超时,返回false
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

    以上是ThreadPoolExecutor的基本实现介绍,除了对主要流程的实现以外,ThreadPoolExecutor还提供了线程池监控、参数配置等方法,实现简单,详情可细读源码。

问题思考:为什么要把runState和workerCount放到同一个AtomicInteger中?

目录
相关文章
|
2天前
|
设计模式 安全 Java
Java编程中的单例模式:理解与实践
【10月更文挑战第31天】在Java的世界里,单例模式是一种优雅的解决方案,它确保一个类只有一个实例,并提供一个全局访问点。本文将深入探讨单例模式的实现方式、使用场景及其优缺点,同时提供代码示例以加深理解。无论你是Java新手还是有经验的开发者,掌握单例模式都将是你技能库中的宝贵财富。
|
4天前
|
Java API Apache
Java编程如何读取Word文档里的Excel表格,并在保存文本内容时保留表格的样式?
【10月更文挑战第29天】Java编程如何读取Word文档里的Excel表格,并在保存文本内容时保留表格的样式?
24 5
|
2天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
3天前
|
Java 开发者
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
19 4
|
3天前
|
消息中间件 供应链 Java
掌握Java多线程编程的艺术
【10月更文挑战第29天】 在当今软件开发领域,多线程编程已成为提升应用性能和响应速度的关键手段之一。本文旨在深入探讨Java多线程编程的核心技术、常见问题以及最佳实践,通过实际案例分析,帮助读者理解并掌握如何在Java应用中高效地使用多线程。不同于常规的技术总结,本文将结合作者多年的实践经验,以故事化的方式讲述多线程编程的魅力与挑战,旨在为读者提供一种全新的学习视角。
24 3
|
2天前
|
设计模式 安全 Java
Java编程中的单例模式深入解析
【10月更文挑战第31天】在编程世界中,设计模式就像是建筑中的蓝图,它们定义了解决常见问题的最佳实践。本文将通过浅显易懂的语言带你深入了解Java中广泛应用的单例模式,并展示如何实现它。
|
3天前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
16 2
|
5天前
|
安全 Java 编译器
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
|
4天前
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
17 1
|
5天前
|
Java 开发工具 Android开发
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)