并发编程 · 基础篇(下) · 三大分析法分析线程池(1)

简介: 并发编程 · 基础篇(下) · 三大分析法分析线程池

image.png

小木箱成长营并发编程系列教程:

并发编程 · 基础篇(上) · android线程那些事

并发编程 · 基础篇(中) · 三大分析法分析Handler

并发编程 · 提高篇(上) · Java并发关键字那些事

并发编程 · 提高篇(下) · Java锁安全性那些事

并发编程 · 高级篇(上) · Java内存模型那些事

并发编程 · 高级篇(下) · Java并发BATJ面试之谈

并发编程 · 实战篇 · android下载器实现

Tips: 关注微信公众号小木箱成长营,回复 "并发编程" 可获得并发编程免费思维导图

一、序言

Hello,我是小木箱,欢迎来到小木箱成长营并发编程系列教程,今天将分享并发编程 · 基础篇(下) · 三大分析法分析线程池

三大分析法分析android线程池主要分为四部分,第一部分是4W2H分析线程池,第二部分是MECE分析线程池,第三部分是SCQA分析线程池,最后一部分是结语。

其中,4W2H分析线程池主要围绕线程池提出了6个高价值问题。

其中,MECE分析线程池主要分为线程池基本操作、线程池生命周期、线程池工作原理、线程池代码案例分析、线程池的性能优化、线程池注意事项、线程池线程数量确定和线程池业务防劣化8部分。

最后,以SCQA的形式在B站上投放一些来自BATJD等大厂的高频面试题。

image.png

如果完全掌握小木箱成长营并发编程系列教程,那么任何人都能通过高并发相关的技术面试。

二、4W2H分析线程池

2.1 What: 线程池具体定义

image.png

线程池是一种管理和调度线程的机制,线程池可以控制线程的数量,确保线程有效地工作,并且可以重复使用以前创建的线程,从而减少系统的开销。

2.2 Why: 线程池使用原因

image.png

如果不使用线程池,每个任务都新开一个线程处理 for循环创建线程,开销太大,我们希望有固定数量的线程,来执行这1000个线程,就避免了反复创建并销毁线程所带来的开销问题

2.3 How: 线程池使用方式


2.3.1 线程池API文档

API 简介
isShutdown 判断当前线程的状态是否是SHUTDOWN,即是否调用了shutdown方法
isTerminating 当前线程池的状态是否小于TERMINATED,并且大于等于SHUTDOWN,即当前线程是否已经shutdown并且正在终止。
isTerminated 线程池是否终止完成
awaitTermination 等待直到线程状态变为TERMINATED
finalize 重新Object的方法,当线程池对象被回收的时候调用,在这里调用shutdown方法终止线程,防止出现内存泄漏
prestartCoreThread 预先启动一个核心线程
prestartAllCoreThreads 预先启动所有的核心线程
remove 从任务队列中移除指定任务
purge 从队列中移除所有的以及取消的Future任务
getPoolSize 获取线程池中线程的数量,即Worker的数量
getActiveCount 获取线程池中正在执行任务的线程Worker数量
getLargestPoolSize 获取线程池曾经开启过的最大的线程数量
getTaskCount 获取总的任务数量,该值为每个Worker以及完成的任务数量,以及正在执行的任务数量和队列中的任务数量
getCompletedTaskCount 获取Worker以及执行的任务数量
beforeExecute 任务执行前调用
afterExecute 任务执行后调用
terminated 线程终止时调用,用来回收资源

2.3.2 线程池基础结构

线程池的基础结构分为三部分: 阻塞队列BlockingQueue、核心参数和Worker工作线程。

2.3.2.1 阻塞队列

线程池ThreadLocal是一个阻塞队列BlockingQueue

private final BlockingQueue<Runnable> workQueue;

阻塞队列BlockingQueue主要是用来提供任务缓冲区的功能,工作线程从阻塞队列BlockingQueue中取出任务来执行。

线程池中存放任务用的是offer方法,取出任务用的是poll方法。 阻塞队列BlockingQueue有三种通用策略

直接提交

直接提交,当一个线程提交一个任务的时候,如果没有一个对应的线程来取任务,提交阻塞或者失败。同样的当一个线程取任务的时候,如果没有一个对应的线程来提交任务,取阻塞或者失败。

SynchronousQueue就是这种队列的实现,这种队列通常要求maximumPoolSizes最大线程数量是无界的,避免提交的任务因为offer失败而被拒绝执行。

当提交任务的速率大于任务执行的速率的时候,这种队列会导致线程数量无限的增长。

无界队列

无界队列,无界队列实现的例子是LinkedBlockingQueue,当核心线程都处于忙碌的情况的时候, 提交的任务都会添加到无界队列中,不会有超过核心线程数corePoolSize的线程被创建。

这种队列可能适用于任务之间都是独立的,任务的执行都是互不影响的。

例如,在一个web服务器中,这种队列能够用来使短时间大量的并发请求变得更加平滑,当提交任务的速率大于任务执行的速率的时候,这种队列会导致队列空间无限增长。

有界队列

有界队列,有界队列实现的例子是ArrayBlockingQueue,使用该队列配合设置有限的maximumPoolSizes可以防止资源耗尽,这种情况下协调和控制资源和吞吐量是比较困难的。

队列大小和maximumPoolSize的设置是比较矛盾的:使用容量大的队列和较少的线程资源会减少CPU、OS资源、线程上下文切换等的消耗,但是会降低系统吞吐量。

如果任务频繁的阻塞,例如任务是IO密集的类型,这种情况下系统能够调度更多的线程。使用小容量队列,就要要求maximumPoolSize大一些,从而让CPU保持忙碌的状态,但是可能出现线程上下文切换频繁、线程数量过多调度困难已经创建线程OOM导致资源耗尽的情况,从而降低吞吐量。

SynchronousQueue vs LinkedBlockingQueue vs ArrayBlockingQueue

SynchronousQueue

SynchronousQueue适用于请求响应要求无延迟,请求并发量较少的场景

当线程Worker没有从队列取任务的时候,offer返回false,直接开启线程。当Worker从队列取任务的时候,该任务可以直接提交给Worker执行。

因此,这种线程池不会出现等待的情况,响应速度很快。

队列的缺点是提交任务速度大于任务执行速度时,会导致线程无限增长,因此,使用场景需要是并发量较少的情况。

例如,在OkHttp框架中执行HTTP请求就是用的这种队列构建的线程池。

LinkedBlockingQueue

LinkedBlockingQueue适用于并发量高,任务之间都是独立的,互不影响的场景。

比如在web服务器中,面对瞬时大量请求的涌入,可以更加平滑的处理,从而起到削峰的作用,并且防止线程资源的耗尽。

ArrayBlockingQueue

ArrayBlockingQueue是介于前两者之间的队列,可以协调系统资源和吞吐量之间的平衡。

2.3.2.2 核心参数

一个Worker代表一个工作线程,wrokers存储的是线程池中所有的工作线程。

工作线程的核心参数有如下

private final HashSet<Worker> workers = new HashSet<Worker>();
// ---------------------------------------------------------------
// 曾经开启过的最大的线程数量
private int largestPoolSize;
// 完成的任务的数量
private long completedTaskCount;
// ---------------------------------------------------------------
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;

这几个变量都是用户设置的参数变量,为了保证参数设置的可见性,所有参数都使用volatile修饰。 ThreadFactory是线程创建工厂,提供线程创建和配置的接口,这里使用的是工厂方法模式,默认的实现是DefaultThreadFactory。

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    //注意,Runnable r 就是工作线程接口Worker,需要传到Thread中。
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group,r,
                               namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

默认的线程工厂创建的线程名称为pool-poolNumber-thread-threadNumber,第一个线程池第一个线程名称为pool-0-thread-0,第二个线程名称为pool-0-thread-1,第二个线程池第一个线程名称为pool-1-thread-0,第二个线程名称为pool-1-thread-1,依次类推。

RejectedExecutionHandler

是当任务 被拒绝时的执行接口,提供了4种实现策略,默认采取AbortPolicy策略,如果不设置,线程池在拒绝任务的时候会抛出异常。

CallerRunsPolicy

在当前提交线程直接运行该任务。

AbortPolicy

直接抛出RejectedExecutionException异常。

DiscardPolicy

丢弃该任务,什么都不做。

DiscardOldestPolicy

从队列中移除头节点任务,然后再次提交任务到线程池。

keepAliveTime

闲置线程等待任务的超时时间,线程使用时间当线程数量超过corePoolSize的时候或者设置了允许核心线程超时的时候,否则线程会一直等待直到有新的任务。

allowCoreThreadTimeOut

允许核心线程超时,默认是false,如果设置为true,则核心线程会使用超时时间来等待任务。

corePoolSize

核心线程数量,默认情况下核心线程会一直等待直到有新的任务,如果设置了允许核心线程超时,则最小线程数为0。

maximumPoolSize

可以开启的最大的线程数量。

2.3.2.3 工作线程

2.3.2.1.1 Worker定义

Worker代表一个工作线程,该类实现了Runnable接口,继承自AQS,内部实现了一套简单的锁机制。这里使用的是代理模式,Worker实现了Runnable,然后轮训任务队列,取出任务执行。详细代码如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    /** Thread this worker is running in.  Null if factory fails. */
    //代表工作线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    //该线程执行的第一个任务
    Runnable firstTask;
    /** Per-thread task counter */
    //该线程已经执行的任务数量
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        //注意 将当前代理Runnable传递到ThreadFactory作为线程的执行载体。
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    //该Runnable作为代理,开启轮训,从队列中取出提交的Runnable来执行。
    public void run() {
        runWorker(this);
    }
    // Lock methods
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    //简单的互斥锁,把0修改为1代表获取到锁,把1修改为0代表释放锁。
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0,1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
}
2.3.2.1.2 Worker创建

那么该如何创建一个工作线程呢?

创建工作线程主要分为四步:

  1. 判断当前线程的数量是否超过了corePoolSize或者maximumPoolSize,如果超过返回false,如果没有,通过cas增加当前线程的数量。
  2. 创建Worker,在其构造方法中会通过ThreadFactory创建线程,然后将Worker添加到集合中。
  3. 如果Worker创建成功,调用Thread的start方法启动线程,开启任务轮训。
  4. 如果线程启动失败,处理Worker创建失败的情况,将Worker移除,避免内存泄漏,然后尝试终止线程池。
private boolean addWorker(Runnable firstTask,boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //获取当前线程的数量,如果是核心线程模式,线程数量不能大于corePoolSize
            //如果是非核心线程模式,线程数量不能大于maximumPoolSize 否则返回false
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //使用cas 更新线程Worker的数量,更新成功退出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建Worker,Worker构造函数中会通过ThreadFactory创建Thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //加锁,并发安全控制
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //线程已经启动 抛出异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //添加到 HashSet<Worker>集合中
                    workers.add(w);
                    //更新当前线程数量 给largestPoolSize赋值
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程 开启任务轮训
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果线程启动失败 会将刚刚添加的Worker移除
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
//Worker添加失败的处理
 private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //移除Worker 防止内存泄漏
            if (w != null)
                workers.remove(w);
            //减少Worker的数量
            decrementWorkerCount();
            //尝试终止线程池
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
2.3.2.1.3 Worker线程原理

Worker线程原理分为七步:

  1. 开启循环,先执行firstTask,然后调用getTask()从队列中取出任务执行。
  2. 任务执行前会先判断线程池的状态,当线程池的状态是STOP的时候,中断任务线程。
  3. 调用beforeExecute方法。
  4. 调用任务的run方法执行任务。
  5. 调用afterExecute方法。
  6. 任务执行完成后,累加当前Worker执行的任务数量到Wroker的completedTasks变量中。
  7. 循环结束后,线程执行结束,处理后续情况。
 //Worker的run方法中调用runWorker方法开启轮训
public void run() {
            runWorker(this);       
 }
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //首先执行firstTask 然后在通过getTask()方法从队列中取出任务执行。
        while (task != null || (task = getTask()) != null) {
            //执行任务前先加锁 可以通过tryLock方法 判断当前Worker线程是否在执行任务
            //如果在执行任务,tryLock返回false,否则,返回true
            w.lock();
            // If pool is stopping,ensure thread is interrupted;
            // if not,ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //如果线程池是处于STOP的状态,即调用了shutDownNow方法,确保线程是中断的
            if ((runStateAtLeast(ctl.get(),STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(),STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //hook方法,任务执行前调用beforeExecute,任务执行后调用afterExecute
                //可以通过这两个方法来监控线程池中任务的执行情况
                beforeExecute(wt,task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task,thrown);
                }
            } finally {
                task = null;
                //累加当前Worker已经完成的任务数量
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //Worker退出时的后续处理
        processWorkerExit(w,completedAbruptly);
    }
}
2.3.2.1.3 Task任务的获取

任务获取主要分为两步:

  1. 线程池并没有区分核心线程和非核心线程,仅仅保证核心线程的数量。 当线程数量大于核心线程数量,或者设置了核心线程可超时,则通过超时polll方法获取任务,否则通过无限阻塞take方法获取任务。,线程数量等于核心线程数量时,剩下的线程会一直阻塞直到有任务执行,线程数量大于核心线程数量是,非核心线程会在超时时间之后退出。刚开始创建的核心线程可能会退出,后来创建的非核心线程可能会一直存活到最后。
  2. 当线程池的状态是STOP或者线程池的状态是SHUTDOWN并且队列是空的时候,会返回null,Wroker线程结束执行,减少Worker的数量。
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //返回null的情况有两种
        //1.线程池的状态变为STOP。
        //2.线程池的状态是SHUTDOWN,当时队列是空的。
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        //线程池并没有区分核心线程和非核心线程,只是根据当前的线程数量来使用不同的获取任务的方法
        //1.线程数量大于corePoolSize 或者设置了核心线程超时,则使用超时poll方法获取任务
        //2.线程数量等于corePoolSize并且没有设置核心线程超时,使用take方法获取任务
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //超时,当前的Worker即将退出循环,因此,修改Worker的数量,然后返回null。
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :
                workQueue.take();
            //如果不是空的返回,如果是空的,说明已经超时,设置timeOut为true
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
2.3.2.1.4 Worker线程退出
  1. 正常退出情况下,在getTask方法中已经调整了线程数量,但是异常退出情况,来不及调整,在这里需要重新调整线程数量。
  2. 移除Worker,统计总任务数量。
  3. 尝试终止线程池,调用tryTerminate()方法。
  4. 如果当前线程状态小于STOP,即RUNNING和SHUTDOWN状态,需要补齐线程数量。如果线程异常退出,直接调用addWorker方法补齐线程;如果线程正常退出,判断当前线程数量是否小于线程池最小线程数量,如果小于,直接补齐,否则,直接返回。正常退出可能是超过核心线程数量的线程获取 任务超时了,这种情况是不需要补齐的。如果最小线程数量为0,但是队列中还有任务,线程池的状态不是STOP,是需要补齐的。
private void processWorkerExit(Worker w,boolean completedAbruptly) {
    //completedAbruptly 代表线程是正常退出 还是异常退出
    //如果线程是正常退出,在getTask方法中已经调整了workerCount
    //如果线程异常退出,需要在这里调整workerCount
    if (completedAbruptly) // If abrupt,then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //把该Worker执行的任务数量累加到总任务数量变量中 然后从集合中移除Worker
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池,只有最后一个Worker线程执行完,才会终止线程池
    tryTerminate();
    //获取线程状态,如果线程池的状态小于STOP 即RUNNING和SHUTDOWN状态,
    //并且线程是正常退出,计算当前应该存活的最小线程数量,如果min为0,但是队列不是空的,
    //则线程池还需要线程来执行任务,修改min为1
    //如果当前线程数量大于min,则直接返回,不需要补齐线程空缺
    //如果当前线程数量小于min,则调用addWorker补齐线程空缺。
    int c = ctl.get();
    if (runStateLessThan(c,STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null,false);
    }
}
 //尝试终止线程池
 final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
           //1.当前线程状态为RUNNING,直接返回,此时未调用shutDown或者shutDownNow方法,不需要终止。
           //2.当前线程状态大于TIDYING,说明其他Worker已经开始执行terminated()方法,为了保证该法仅            //  执行1次,直接返回。
           //3.当前线程状态为SHUTDOWN并且队列不是空的,直接返回,需要等待队列任务执行完再终止。
            if (isRunning(c) ||
                runStateAtLeast(c,TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //如果worker数量不为0,尝试中断当前闲置的线程,即在poll和take中等待的线程,从而让所有线程
            //都退出任务轮训,加速线程池回收进程。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //更新线程池的状态为TIDYING,调用terminated(),这是一个hook方法,
                //可以在这里面做一些资源回收的操作,执行完后,设置线程池状态为TERMINATED
                //唤醒在awaitTermination方法上等待的线程。
                if (ctl.compareAndSet(c,ctlOf(TIDYING,0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED,0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
//尝试中断等待在取出任务的线程,如果onlyOnw为true,只会中断一个。
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
2.3.2.1.5 Worker线程终止
  1. 线程池的状态为RUNNING,直接返回,不需要终止。
  2. 线程状态为SHUTDOWN并且队列不是空的,直接返回,需要等待队列任务执行完再终止。
  3. 当前线程状态大于TIDYING,说明其他Worker已经开始执行terminated()方法,为了保证该方法仅执行一次,直接返回。
  4. 如果worker数量不为0,尝试中断当前闲置的线程,即在poll和take中等待的线程,从而让所有线程都退出任务轮训,加速线程池回收进程。
  5. 更新线程池的状态为TIDYING,调用terminated(),这是一个hook方法,可以在这里面做一些资源回收的操作,执行完后,设置线程池状态为TERMINATED,唤醒在awaitTermination方法上等待的线程。

2.3.3 创建和停止线程池

image.png

ExecutorService executorService=Executors.newFixedThreadPool(5);
       // 1.创建线程池
        for(int i=0;i< 10;i++){
           executorService.execute(new Runnable(){
          @Override
          public void run(){
             System.out.println(Thread.currentThread().getName()+"办理业务");     
}
        });
    }
// 2.关闭线程池
        executorService.shutdown();
    // executorService.shutdownNow();
       }
    }

2.3.4 手动创建 vs 自动创建

  •  一般情况下,应该手动创建线程池,因为手动创建可以更好地控制线程池的大小,以及线程池中线程的生命周期。
  •  自动创建的线程池大小可能不够,导致线程饥饿,或者线程池中线程的生命周期可能太长,导致系统资源浪费。

2.3.5 线程数量配置

一般来说,线程数量的设定要取决于任务的复杂度和计算机的性能。

如果任务比较复杂,那么线程数量可以设定的比较多,可以提高程序的并行处理能力,从而提高效率。

但是,如果线程数量设定的太多,可能会导致系统资源利用率过高,从而降低系统的效率。

因此,线程数量的设定应根据任务的复杂度和计算机的性能来合理设定。

2.3.6 停止线程池

image.png

使用shutdown()方法来停止线程池,shutdown()方法会等待线程池中正在执行的任务完成,然后才会停止线程池。

如果需要立即停止线程池,可以使用shutdownNow()方法,shutdownNow()方法会尝试终止正在执行的任务,并且拒绝接收新的任务。

2.3.7 线程池状态

image.png

System.out.println(executorService.isShutdown());
System.out.println(executorService.isTerminated());
//关闭线程池
executorService.shutdown();
System.out.println(executorService.isShutdown());
System.out.println(executorService.isTerminated());

输出结果:

false

false

true

true

线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。

线程池把线程数量和线程状态打包到了一个int变量中,然后使用AtomicInteger原子类来修改和获取该值。一方面可以节约内存,一方面可以减少一个原子性操作,提供性能,毕竟原子型操作是不可中断的,阻碍系统的切换上下文。 线程池的5个状态值存储到高3位中,线程数量存储到低29位中。runStateOf方法是用来获取高3位的线程池状态值的,workerCountOf是用来获取低29位的线程池中的线程数量的,ctlOf是把两个值通过或运算打包到一个值中。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs,int wc) { return rs | wc; }

RUNNING是运行状态,接受新任务,处理队列中的任务。

SHUTDOWN是关闭状态, 不接受新任务,但是处理队列中的任务。

STOP是停止状态,不接受新任务,不处理队列中的任务,中断正在执行任务的线程。

TIDYING是整理状态, 所有的任务已经被终止,所有的线程已经执行完变为TERMINATED状态,workerCount为0,线程池之后会调用terminated()扩展hook方法,最后变为TERMINATED状态。

TERMINATED是终止状态,terminated()方法执行完成,在该方法可以做一些资源回收的工作,此时的线程池队列清空,线程终结,资源回收完毕。

线程池的状态是不可逆的,一旦进入TERMINATED 状态,便无法重置,必须重新创建一个新的线程池才能提交任务,和线程的使用是一样的。

2.3.8 线程池状态切换

那么,线程池如何进行状态切换呢?

RUNNING / SHUTDOWN

调用shutdown()方法,或者隐式的在finalize()方法中调用, 线程池的状态变为RUNNING或SHUTDOWN

RUNNING / SHUTDOWN -> STOP

调用shutdownNow(), 线程池的状态由SHUTDOWN变成STOP

SHUTDOWN -> TIDYING

队列是空的,线程数量是0,任务都执行完毕, 线程池的状态由SHUTDOWN变成TIDYING

STOP -> TIDYING

线程数量为0, 线程池的状态由STOP变成TIDYING

TIDYING -> TERMINATED

terminated() 执行完成时候, 线程池的状态由TIDYING变成TERMINATED

TERMINATED

awaitTermination() 调用该方法会一直阻塞直到线程池的状态变成TERMINATED

2.4 When: 线程池使用时机

image.png

  1. 短时间任务:如果需要在应用程序中执行多个短期任务,那么使用线程池可以提高效率并降低资源消耗。
  2. 多用户请求:如果应用程序需要处理多个用户请求,而每个请求需要执行耗时的操作,那么使用线程池可以让应用程序更好地响应用户请求。
  3. 并发访问:如果多个线程需要访问共享资源,例如数据库或文件系统,那么使用线程池可以避免线程之间的竞争条件,并提高应用程序的吞吐量。
  4. 异步任务:如果应用程序需要执行异步任务,例如下载文件或处理大量数据,那么使用线程池可以让应用程序更加高效地执行这些任务,并且避免阻塞主线程。

2.5 How Much: 线程池业务价值

image.png

因为线程存在两个弊端.第一个是反复创建线程开销大,第二个是过多的线程会占用太多内存

解决过多的线程会占用太多内存的思路是让这部分线程都保持工作,且可以反复执行任务避免生命周期的损耗

解决过多的线程会占用太多内存的思路是用少量的线程避免内存占用过多

而线程池刚好契合了上述两个优势,而且线程池有以下三个业务价值:

第一个是复用线程,减少线程频繁创建和销毁带来的系统开销。加快响应速度;

第二个是合理利用CPU和内存;

第三个是统一的线程管理,避免出现随意开启线程导致线程数量过多从而引发OOM。

2.6 Where: 线程池使用场景

image.png

在Android开发中,线程池常常被用于以下场景:

  1. 处理网络请求:Android应用通常需要与服务器进行数据交互,网络请求通常是一个异步操作,使用线程池可以避免网络请求阻塞UI线程,保持应用的响应性。
  2. 处理耗时操作:例如对大文件的读写、复制、压缩等操作,操作会阻塞UI线程,导致应用卡顿,使用线程池可以将操作放到工作线程中执行。
  3. 并行执行多个任务:当需要同时执行多个任务时,例如下载多个文件,使用线程池可以使任务并行执行,提高效率。
  4. 处理定时任务:当需要执行定时任务时,例如轮询服务器,定时更新UI等,使用线程池可以在定时任务完成后将结果返回到UI线程中。
  5. 处理大量任务队列:例如使用RecyclerView展示大量数据,需要异步加载图片等操作,使用线程池可以管理任务队列,优化系统资源的使用。

综上所述,线程池在Android开发中的应用场景主要是处理网络请求、处理耗时操作、并行执行多个任务、处理定时任务以及处理大量任务队列等,能够提高应用的性能和响应速度,同时避免UI线程阻塞和ANR问题。

相关文章
|
16天前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
36 0
|
4天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践
|
8天前
|
缓存 Java 数据处理
Java中的并发编程:解锁多线程的力量
在Java的世界里,并发编程是提升应用性能和响应能力的关键。本文将深入探讨Java的多线程机制,从基础概念到高级特性,逐步揭示如何有效利用并发来处理复杂任务。我们将一起探索线程的创建、同步、通信以及Java并发库中的工具类,带你领略并发编程的魅力。
|
9天前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
28 4
|
16天前
|
Java API 开发者
Java中的并发编程:解锁多线程的潜力
在数字化时代的浪潮中,并发编程已成为软件开发的核心技能之一。本文将深入探讨Java中的并发编程概念,通过实例分析与原理解释,揭示如何利用多线程提升应用性能和响应性。我们将从基础的线程创建开始,逐步过渡到高级的同步机制,并探讨如何避免常见的并发陷阱。读者将获得构建高效、稳定并发应用所需的知识,同时激发对Java并发更深层次探索的兴趣。
27 2
|
21天前
|
存储 SQL Java
(七)全面剖析Java并发编程之线程变量副本ThreadLocal原理分析
在之前的文章:彻底理解Java并发编程之Synchronized关键字实现原理剖析中我们曾初次谈到线程安全问题引发的"三要素":多线程、共享资源/临界资源、非原子性操作,简而言之:在同一时刻,多条线程同时对临界资源进行非原子性操作则有可能产生线程安全问题。
|
4天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
12天前
|
Java 数据库
Java中的并发编程:深入理解线程池
在Java的并发编程领域,线程池是提升性能和资源管理的关键工具。本文将通过具体实例和数据,探讨线程池的内部机制、优势以及如何在实际应用中有效利用线程池,同时提出一个开放性问题,引发读者对于未来线程池优化方向的思考。
30 0
|
16天前
|
数据采集 并行计算 程序员
Python中的并发编程:理解多线程与多进程
在Python编程中,理解并发编程是提升程序性能和效率的关键。本文将深入探讨Python中的多线程和多进程编程模型,比较它们的优劣势,并提供实际应用中的最佳实践与案例分析。
|
19天前
|
安全 Java 程序员
大家都说Java有三种创建线程的方式!并发编程中的惊天骗局!
今天来聊一个比较有意思的话题,这是一道Java八股文中的八股文,简称八股文Plus!