Java线程池详解

简介: 线程池必须要记住哪几个参数?线程池添加线程有哪些规则?线程池是手动创建还是自动创建好?线程池怎么暂停与恢复?线程池是如何实现线程复用的?本文带你一探究竟!

1. 初识多线程

相信大家使用多线程就会经常看到这样的代码,如下

public class EveryTaskOneThread {
   
   
    public static void main(String[] args) {
   
   
        for (int i = 0; i < 10; ++i) {
   
   
            Thread thread = new Thread(new Task());
            thread.start();
        }
    }

    static class Task implements Runnable {
   
   
        @Override
        public void run() {
   
   
            System.out.println("执行了任务");
        }
    }
}

  当任务数量上升到1000+,这样内存开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并销毁线程所带来的开销问题。

1.1 为什么要使用线程池?

  • 反复创建线程开销大
  • 过多的线程会占用太多内存

1.2 线程池的好处

  • 加快响应速度;
  • 合理利用CPU和内存
  • 统一管理资源

1.3 线程池适合应用的场景

  1.服务器接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
   2.实际在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理

2. 线程池必须记住的参数



  线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务

corePoolSize指的是核心线程数

  • 创建的核心线程即使在空闲时,也不会被回收。

maxPoolSize 指的是最大线程数

  • 在核心线程数的基础上,额外增加非核心线程,核心线程数+非核心线程数不能超过maxPoolSize

keepAliveTime指的是回收非核心线程的等待时间

  • 如果线程池当前的线程数多于corePoolSize,那么如果多余的非核心线程线程空闲时间超过keepAliveTime,它们就会被终止

handler是一种拒绝策略

  • 拒绝策略可以在任务满了之后,拒绝执行多余的任务

3. 线程池添加线程的规则

当此时新来一个任务需要执行,线程池会怎么处理?



1.如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新核心线程来运行新任务。
2.如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列。
3.如果队列已满,并且线程数小于maxPoolSize,则创建一个新非核心线程来运行任务。
4.如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝该任务。

总结:
是否需要增加线程的判断顺序是:1、corePoolSize 2、workQueue 3、maxPoolSize

举个例子:
  线程池的核心线程数corePoolSize大小为5,最大池maxPoolSize大小为10,队列workQueue为100。

  因为线程中的请求最多会创建5个,然后任务将被添加到队列中,直到达到100。当队列已满时,将创建新的线程,最多到10个线程,如果再来任务,就拒绝。

4. 增减线程的特点

  • 通过设置corePoolSizemaximumPoolSize 相同,就可以创建固定大小的线程池。
  • 我们使用线程池一般希望保持较少的线程数,并且只有在负载变得很大时才增加它。
  • 通过设置maximumPoolSize为很高的值,例如 Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
  • 只有在队列填满时才创建多于corePoolSize的非核心线程,如果使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize

5. ThreadFactory 用来创建线程

  1. 新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory()

  2. 创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。

  3. 如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。

  4. 通常使用默认的ThreadFactory就可以了

Executors部分代码

// 验证第一点,新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory()
public static ThreadFactory defaultThreadFactory() {
   
   
    return new DefaultThreadFactory();
}

......

static class DefaultThreadFactory implements ThreadFactory {
   
   
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
   
   
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
   
   
        // 验证第二点,创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

6. 守护线程

作用:给用户线程提供服务

Java中有两类线程:User Thread(用户线程)、Daemon Thread(守护线程)

6.1 守护线程的分类标准

分类标准是线程是否会阻止JVM的停止——只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部继续工作;只有当最后一个非守护线程结束时,所有守护线程才会随着JVM一同结束工作。(非守护线程等同于用户线程)

我们知道,Java虚拟机通常会继续执行线程,直到发生以下两种中的任一情况时,Java程序才能运行结束:

  • 已调用System.exit()方法
  • 所有非守护程序线程的线程都已结束而一般情况下我们不会调用System.exit()方法,所以大部分的Java程序的结束都是由于所有用户线程都结束而导致的。

所以可以认为,任何一个守护线程都是整个JVM中所有用户线程(非守护线程)的管家。Daemon的作用是为其他线程的运行提供便利服务,守护线程最典型的应用就是 GC (垃圾回收器),它是一个很称职的守护者。守护线程的特性线程类型默认继承自父线程,守护线程创建的线程为默认是守护线程,同样,用户线程创建的线程默认为用户线程。非守护线程如果想创建一个守护线程,需要调用Thread.setDaemon来设置它(Thread类用布尔值daemon属性来表示线程是否是守护线程),并且,该方法必须在start之前调用,否则会抛出 IllegalThreadStateException 异常。

6.2 守护线程被谁启动

通常由JVM启动,而不是由用户去启动。

JVM启动时,通常会有一个非守护线程(通常为执行main函数的线程)。不影响JVM退出当只剩下守护线程时,JVM就会退出,因为如果只剩下守护线程,就没必要继续运行程序了。守护线程没结束并不会影响JVM的正常停止:假设所有用户线程都结束了,那么就算有5个守护线程正在运行,JVM也会正常停止:守护线程和普通线程的区别UserDaemon两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果 User Thread已经全部退出运行了,只剩下Daemon Thread存在了,虚拟机也就退出了,这是因为没有了“被守护者”,Daemon也就没有工作可做了,也就没有继续运行程序的必要了。这就是守护线程的作用:告诉JVM不需要等待它退出,当JVM中所有的线程都是守护线程的时候,JVM就可以正常的退出了。

6.3 是否需要给线程设置为守护线程?

我们通常不应把自己的线程设置为守护线程,因为设置为守护线程是很危险的。比如线程正在访问如文件、数据库的时候,所有用户线程都结束了,那么守护线程会在任何时候甚至在一个操作的中间发生中断,所以守护线程永远不应该去访问固有资源。

7. 工作队列workQueue

有3种最常见的队列类型:

直接交接:SynchronousQueue

  • 工作任务不多的情况下,只是将任务中转,就可以用SynchronousQueue,这个队列本身内部没有容量,使用这种队列,maximumPoolSize就可能需要设置的大一些。因为没有队列容量作为缓冲了,很容易创建新线程。

无界队列:LinkedBlockingQueue

  • 这种队列容量无限大,可以防止流量突增。设置maximumPoolSize也用不到,因为队列装不满,永远不需要创建新的非核心线程。但是也有风险,处理任务的速度跟不上提交的速度,可能造成内存浪费或者OOM

有界的队列:ArrayBlockingQueue

  • 这种队列可以设置大小,这样设置maximumPoolSize参数就有意义了,队列满了就创建新的非核心线程执行任务,不超过maximumPoolSize即可。

8. 线程池应该手动创建还是自动创建?

  其实手动创建更好,因为这样可以更加明确线程池的运行规则,避免资源耗尽的风险。
  自动创建线程池(即直接调用JDK封装好的构造方法)可能带来一些风险。

8.1 newFixedThreadPool

// 我们执行如下代码发生了什么
ExecutorService executorService = Executors.newFixedThreadPool(4)

查看源码

......
// ThreadPoolExecutor函数声明是这样的
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
......
public static ExecutorService newFixedThreadPool(int nThreads) {
   
   
// 这里核心线程数和最大线程数都是传进来的nThreads,这里是4,后面keepAliveTime设置为0,因为没什么意义,不会创建非核心线程,也不会被回收。
// 因为线程数量固定了,万一传进来非常多任务怎么办,LinkedBlockingQueue无界队列,容纳无数任务,防止流量突增。
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

所以因为无界队列LinkedBlockingQueue的存在,使用newFixedThreadPool容易造成大量内存占用,可能会导致OOM,下面我们用例子体会一下这种情况造成的OOM

VM options设置为-Xmx8m -Xms8m,这样更快达到OOM

public class FixedThreadPoolOOM {
   
   
    private static ExecutorService executorService = Executors.newFixedThreadPool(4);
    public static void main(String[] args) {
   
   
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
   
   
            executorService.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
   
   
    @Override
    public void run() {
   
   
        try {
   
   
            Thread.sleep(1000000000);// 一直睡眠,塞满队列,体会OOM
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
    }
}

很快就会报错

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at threadpool.FixedThreadPoolOOM.main(FixedThreadPoolOOM.java:14)

8.2 newSingleThreadExecutor

// 执行下面代码会发生什么
ExecutorService executorService = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
   
   
// 可以看到这个和上一个例子很相似,就是固定大小为1的线程池,不创建非核心线程,有LinkedBlockingQueue作为缓冲
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

那么同理,当请求堆积的时候,可能会占用大量的内存线程池,原理和newFixedThreadPool一样,只是线程数量设为了1

8.3 newCachedThreadPool

可以缓存线程池,具有自动回收多余线程的功能。

public class CachedThreadPool {
   
   
    public static void main(String[] args) {
   
   
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
   
   
            executorService.execute(new Task());
        }
    }
}

class Task implements Runnable {
   
   
    @Override
    public void run() {
   
   
        try {
   
   
            Thread.sleep(500);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

运行的部分截图如下,甚至看到了创建了958个线程
在这里插入图片描述
我们看看源码

public static ExecutorService newCachedThreadPool() {
   
   
// 核心线程数为0,所有的非核心线程均可回收。线程最大数为Integer.MAX_VALUE,这几乎看作无限大没有限制了。
// 非核心线程回收的时间为60秒,利用SynchronousQueue交换队列交换数据,即又更多任务进来利用SynchronousQueue交换再创建新线程。
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

这种创建方式的弊端在于第二个参数maximumPoolSize被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM

8.4 newScheduledThreadPool

一般用来执行和时间相关的任务

public class ScheduledThreadPoolTest {
   
   
    public static void main(String[] args) {
   
   
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
//        threadPool.schedule(new Task(), 5, TimeUnit.SECONDS); // 延迟5s执行Task任务
        threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS); // 最初延迟1s执行Task,之后每隔3s执行Task
    }
}

代码结果就不打印了
来看看源码

......
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   
   
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
......
    public ScheduledThreadPoolExecutor(int corePoolSize) {
   
   
    // 可以看到这里设置核心线程数、最大线程数、非核心线程回收等待时间为0、延迟队列DelayedWorkQueue达到时间执行任务等等,原理和newCachedThreadPool一样,请看上一个例子
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
......
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
   
   
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

原因和newCachedThreadPool一样



综上,这几种自动创建均有缺陷。

正确的创建线程池的方法:根据不同的业务场景,设置线程池参数。比如内存有多大,给线程取什么名字等等。

9. 线程池里的线程数量设定为多少比较合适?

这个得看任务类型

  • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。
  • 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍 ,参考Brain Goetz专家推荐的计算方法:线程数=CPU核心数*(1+平均等待时间/平均工作时间)

如果需要更精确的线程数量,那就需要根据不同的程序去做压测,这样就能得到比较合适的线程数量。

10.停止线程池的正确方法

10.1 shutdown

  不一定立即停止。执行了该方法之后,后面再请求执行的任务会拒绝,当前线程正在执行的任务和任务队列等待的任务还是会执行完才会停止线程池。即存量任务等待执行完毕,新任务拒绝。

public class ShutDown {
   
   
    public static void main(String[] args) throws InterruptedException {
   
   
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; ++i) {
   
   
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);// 先执行1500ms再去停止
        executorService.shutdown();
        executorService.execute(new ShutDownTask());
    }
}
class ShutDownTask implements Runnable {
   
   
    @Override
    public void run() {
   
   
        try {
   
   
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
   
   
            System.out.println(Thread.currentThread().getName() + "被中断了");
        }
    }
}

运行结果:
在这里插入图片描述
执行shutdown()之后,再来一个任务会被拒绝,抛出异常java.util.concurrent.RejectedExecutionException...Shutting down, pool size = 10, active threads = 10, queued tasks = 70, completed tasks = 20,已经提示正在关闭,后面的任务不接受了。队列还有70各任务等待,正在执行10个,已完成20个。

那总不能每次执行一个新任务看是否被拒绝来判断是否正在停止吧?于是有了isShutdown方法

10.2 isShutdown

  只要执行了shutdown方法,isShutdown方法就会返回true,只要把上面代码executorService.shutdown();前后各添加一句System.out.println(executorService.isShutdown());就可以看到一个false一个true,打印完成后,任务队列还有任务在继续执行打印。

10.3 isTerminated

可以判断线程池是不是已经完全终止

public class ShutDown {
   
   
    public static void main(String[] args) throws InterruptedException {
   
   
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; ++i) {
   
   
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();
        System.out.println(executorService.isTerminated()); // false 此时队列还有任务
        Thread.sleep(10000);
        System.out.println(executorService.isTerminated()); // true 等待时间足够长,线程池已完全停止
    }
}

class ShutDownTask implements Runnable {
   
   
    @Override
    public void run() {
   
   
        try {
   
   
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
   
   
            System.out.println(Thread.currentThread().getName() + "被中断了");
        }
    }
}

执行结果:
中间打印false
在这里插入图片描述
等待10s后,最后打印true,表示线程池已经完全停止。
在这里插入图片描述

10.4 awaitTermination

  测试一段时间内任务是否执行完毕,作判断用。执行这个方法后指定时间内,线程处于阻塞状态,线程运行完毕则返回true,否则返回false,代表超时。

public class ShutDown {
   
   

    public static void main(String[] args) throws InterruptedException {
   
   
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
   
   
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();
        boolean b = executorService.awaitTermination(1L, TimeUnit.SECONDS);
        // 1s内肯定执行不完,阻塞等待1s后返回false
        System.out.println(b);
        // 7s内执行完了,阻塞等待着,然后提前执行完就直接返回true
        boolean b1 = executorService.awaitTermination(7L, TimeUnit.SECONDS);
        System.out.println(b1);
    }
}
// ShutDownTask代码之前例子已给出

有3种情况会返回awaitTermination()会返回值,返回前是阻塞的。
1.所有任务执行完毕了,返回true
2.等待的时间到了,返回false
3.等待过程被中断,抛出InterruptedException

10.5 shutdownNow

  试终止所有正在执行的任务,线程池正在执行的线程收到中断信号,并停止处理等待队列中的的任务,最后将所有未执行的任务以列表的形式返回

public class ShutDown {
   
   
    public static void main(String[] args) throws InterruptedException {
   
   
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
   
   
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        List<Runnable> runnableList = executorService.shutdownNow();
    }
}
class ShutDownTask implements Runnable {
   
   
    @Override
    public void run() {
   
   
        try {
   
   
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
   
   
            System.out.println(Thread.currentThread().getName() + "被中断了");
        }
    }
}

在这里插入图片描述

11. 任务拒绝策略

任务拒绝时机

  • Executor关闭时,提交新任务会被拒绝。
  • 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。

11.1 AbortPolicy

直接抛出异常java.util.concurrent.RejectedExecutionException,和之前一样,调用shutdown()之后,还去执行新任务executorService.execute(new ShutDownTask())

11.2 DiscardPolicy

把任务默默丢弃,并且不会通知

11.3 DiscardOldestPolicy

丢弃最老的存在时间最久的任务

11.4 CallerRunsPolicy

这种策略有2大好处。
1.任务拒绝后让提交任务的线程去执行,比如主线程调用的execute方法,任务爆满拒绝后让主线程代劳执行,避免了业务损失。
2.给主线程代劳执行主线程此刻就不能继续添加新任务了,就必须把代劳的任务执行完才可以执行新任务,执行代劳任务的时候线程池的其他任务也能同时执行完一些,给了线程池的线程执行任务的一些缓冲时间。

12. 线程池的暂停和恢复

这里涉及到钩子函数,代码中的构造函数跳过不用看。

public class PauseableThreadPool extends ThreadPoolExecutor {
   
   
    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition(); // 返回与此Lock实例一起使用的Condition实例
    private boolean isPaused;

// 下面几个是创建构造器用super匹配父类构造器,避免语法报错
// ========================下面几个构造不用看=========================
    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue) {
   
   
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory) {
   
   
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               RejectedExecutionHandler handler) {
   
   
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory, RejectedExecutionHandler handler) {
   
   
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
                handler);
    }
// ============================上面这些构造不用看========================
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
   
    // 线程执行之前调用
        super.beforeExecute(t, r);
        System.out.println("钩子======");
        lock.lock();
        try {
   
   
            while (isPaused) {
   
   
                unpaused.await();
            }
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        } finally {
   
   
            lock.unlock();
        }
    }

    private void pause() {
   
   
        lock.lock();
        try {
   
   
            isPaused = true;
        } finally {
   
   
            lock.unlock();
        }
    }

    public void resume() {
   
   
        lock.lock();
        try {
   
   
            isPaused = false;
            unpaused.signalAll();
        } finally {
   
   
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
   
   
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {
   
   
            @Override
            public void run() {
   
   
                System.out.println("我被执行");
                try {
   
   
                    Thread.sleep(10);
                } catch (InterruptedException e) {
   
   
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 1000; i++) {
   
   
            pauseableThreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了");
    }
}

打印结果
在这里插入图片描述

13. 线程池的实现原理和源码分析

线程池组成部分

  • 线程池管理器(ThreadPool)
    用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务。
  • 工作线程(PoolWorker)
    我们创建出来执行任务的线程,会反复从队列中取任务,然后去执行。
  • 任务队列(taskQueue)
    存放任务的队列有很多种,把没有处理的任务放在队列中,因为线程池会同时有多个线程去队列中取任务,所以任务队列必须是支持并发的。所以线程池选用线程安全的BlockingQueue来作为任务队列。
  • 任务接口(Task)
    线程从任务队列中取出来的任务

13.1 Executor家族?

线程池、ThreadPoolExecutorExecutorServiceExecutorExecutors等这么多和线程池相关的类,都是什么关系?
在这里插入图片描述

  • Executor 顶层接口就是用来执行方法的
    public interface Executor {
         
         
      void execute(Runnable command);
    }
    
  • ExecutorService继承了Executor 接口,还增加了新的方法,拥有了初步管理线程池的方法。
  • Executors是一个工具类,快速创建线程池都是用的它,比如创建newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPoolnewScheduledThreadPool这些线程池等
  • 创建线程池为什么返回的不是ThreadPoolExecutor对象呢而是一个ExecutorService ?如下:
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    

其实我们看源码知道,返回的其实是new ThreadPoolExecutor,而返回值类型是ExecutorService ,因为ExecutorServiceThreadPoolExecutor的间接的一个父接口(其父类实现的接口),这就是多态。

    public static ExecutorService newFixedThreadPool(int nThreads) {
   
   
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

13.2 线程池实现线程复用的原理

不管愿不愿意,你必须看点源代码才能理解,我尽量不堆砌多余的源代码

    // ThreadPoolExecutor类的execute方法
    public void execute(Runnable command) {
   
   
        if (command == null)
            throw new NullPointerException();
        // ctl记录了线程池状态和线程数
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
   
    // 如果此时的线程数小于核心线程数
             // 传入参数1:即将让线程执行的任务
             // 参数2:true表示增加线程的时候判断当前线程数是否小于corePoolSize,false表示判断当前线程数是否小于maximumPoolSize
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        }
        // 判断到这里说明当前线程数大于等于corePoolSize,线程池如果正在运行状态,那么就把Runnable任务放在工作队列中
        if (isRunning(c) && workQueue.offer(command)) {
   
   
            int recheck = ctl.get(); // 获取线程池状态
            // 如果已经不在运行状态了,移除这个Runnable,并且拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 任务可以发生异常导致线程不再执行,如果此时工作线程为0,但是任务队列里面可能还有任务,这样为了避免卡死
            // 新建一个核心工作线程,去执行队列剩余任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 走到这个else if逻辑说明,程数大于等于corePoolSize,并且要么此时线程池处于不再运行状态了或者说任务队列已经满了,
        // 那么就去增加核心工作线程,直到线程数大于maximumPoolSize时,拒绝后面任务。
        // addWorker会返回是否新增线程成功,true-成功,false-失败
        else if (!addWorker(command, false))
            reject(command);
    }

需要注意的是,addWorker方法里面会有new Worker(runnable对象),代表新建一个工作线程(核心线程),最终会执行到runWorkerrunWorker方法里面会获取当前Runnable任务,拿不到就到阻塞队列去取出来

        // Worker类里面runWorker方法部分代码如下
         Runnable task = w.firstTask;// 这个firstTask是Runnable对象
         ......
         // 这里有task = getTask()去阻塞队列取任务
         while (task != null || (task = getTask()) != null) {
   
   
             ...
             task.run(); // 线程开始执行任务
             ...
         }

这个Worker类里面的runWorker方法描述了同一个线程执行不同的任务的原因,整体能看到线程复用的过程,这些线程是怎么一个个反复执行新任务的。

代码就是这样,如果一步步跟进需要大量贴源代码,由于篇幅原因,这里就不贴了,请自行根据编译器查看源码。

13.3 线程池的状态

线程池一共5种状态



欢迎一键三连~



有问题请留言,大家一起探讨学习



----------------------Talk is cheap, show me the code-----------------------

目录
相关文章
|
1月前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
4月前
|
Java 调度 数据库
Java并发编程:深入理解线程池
在Java并发编程的海洋中,线程池是一艘强大的船,它不仅提高了性能,还简化了代码结构。本文将带你潜入线程池的深海,探索其核心组件、工作原理及如何高效利用线程池来优化你的并发应用。
|
4月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
129 1
|
4月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
4月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
19天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
15天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
1月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
112 38
|
22天前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
1月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
91 4
下一篇
DataWorks