剑指JUC原理-13.线程池(上)

简介: 剑指JUC原理-13.线程池

ThreadPoolExecutor



线程池状态


ThreadPoolExecutor 使用 int 的高 3 位(最高位1代表的是负数)来表示线程池状态,低 29 位表示线程数量

状态名 高三位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN(调用了线程池的SHUTDOWN) > RUNNING(创建出来的初始状态)


这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }


构造方法


public ThreadPoolExecutor(int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目(核心线程 + 救急线程数)
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略


工作方式:

线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。


当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。


如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。(救急线程和核心线程的最大区别就是,它有生存时间,执行完了,过一段时间,没有新任务了,就销毁掉了,下次高峰期来了,才会再有)


如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现


  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
  • CallerRunsPolicy 让调用者运行任务
  • DiscardPolicy 放弃本次任务
  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池。


其它优秀框架的实现(总结整理)


  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
  • Netty 的实现,是创建一个新线程来执行任务
  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略


当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。


newFixedThreadPool


固定大小的线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
}

特点


  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务


评价 适用于任务量已知,相对耗时的任务


newCachedThreadPool


public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
}

特点


1.核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着

  • 全部都是救急线程(60s 后可以回收)
  • 救急线程可以无限创建


2.队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)

SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
  try {
    log.debug("putting {} ", 1);
    integers.put(1);
    log.debug("{} putted...", 1);
    log.debug("putting...{} ", 2);
    integers.put(2);
    log.debug("{} putted...", 2);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
},"t1").start();
sleep(1);
new Thread(() -> {
  try {
    log.debug("taking {}", 1);
    integers.take();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
},"t2").start();
sleep(1);
new Thread(() -> {
  try {
    log.debug("taking {}", 2);
    integers.take();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
},"t3").start();

输出

11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted... 

评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线

程。 适合任务数比较密集,但每个任务执行时间较短的情况


newSingleThreadExecutor


public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

使用场景:


希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。


区别:


  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改


提交任务


// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
 throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;
public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(1);
    }
    private static void method3(ExecutorService pool) throws InterruptedException, ExecutionException {
        String result = pool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin 1");
                    Thread.sleep(1000);
                    log.debug("end 1");
                    return "1";
                },
                () -> {
                    log.debug("begin 2");
                    Thread.sleep(500);
                    log.debug("end 2");
                    return "2";
                },
                () -> {
                    log.debug("begin 3");
                    Thread.sleep(2000);
                    log.debug("end 3");
                    return "3";
                }
        ));
        log.debug("{}", result);
    }
    private static void method2(ExecutorService pool) throws InterruptedException {
        List<Future<String>> futures = pool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin");
                    Thread.sleep(2000);
                    return "3";
                }
        ));
        futures.forEach( f ->  {
            try {
                log.debug("{}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
    private static void method1(ExecutorService pool) throws InterruptedException, ExecutionException {
        Future<String> future = pool.submit(() -> {
            log.debug("running");
            Thread.sleep(1000);
            return "ok";
        });
        log.debug("{}", future.get());
    }

通过以上的例子,可以获取到对应的结果,过程不做赘述,下面主要介绍Future为什么能获取到 线程的结果并返回,至于其它几个返回值,大同小异!


Future机制原理


回顾Runnable和Callable


  • Callable定义了call()方法,Runnale定义了run()方法。
  • call()方法可以抛出异常,run()方法无法抛出异常。
  • Callable有返回值,是泛型的,创建的时候传递进去,执行结束后返回。
  • Callable执行任务的时候可以通过FutureTask得到任务执行的状态。


Callable的call方法实际执行在Runnable的run方法中。Runnable实例对象需要Thread包装启动,Callable先通过FutureTask(本质还是Runnable)包装,再给Thread包装执行。


Future机制原理


Future就是对于具体的Runnable或Callable任务的执行结果进行取消,查询是否完成,获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。


JDK内置的Future主要使用了Callable接口和FutureTask类。下面对源码进行解析:


Callable接口:

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

如何使用Callable:


一般情况下都是配合ExecutorService来使用的,在ExecutorService接口中声明了三个submit方法,用来生成Future对象,参数为Callable实例或Runnable实例。

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

继承关系:

public class FutureTask<V> implements RunnableFuture<V> {}
public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}

关系明确: FutureTask类实现了RunnableFuture接口,RunnableFuture接口又继承了Runnable接口和Future接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,拥有Future接口提供的各种方法。


流程跑通: 通常把任务定义Callable接口的call方法内部,返回值为泛型,再生成一个FutureTask的对象,FutureTask构造方法内部参数封装着Callable实例,然后把这个对象当作一个Runnable,作为参数传递给Thread包装执行。


Future源码解析


Future接口提供的方法:

public interface Future<V> {
    //取消任务。参数:是否立即中断任务执行,或者等等任务结束
    boolean cancel(boolean mayInterruptIfRunning);
    //任务是否已经取消,若已取消,返回true
    boolean isCancelled();
    //任务是否已经完成。包括任务正常完成、抛出异常或被取消,都返回true
    boolean isDone();
    /*会一直阻塞等待任务执行结束,获得V类型的结果。InterruptedException: 线程被中断异常, ExecutionException: 任务执行异常,如果任务被取消,还会抛出CancellationException*/
    V get() throws InterruptedException, ExecutionException;
    /*参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException*/
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}


FutureTask源码解析:


构造方法:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        //状态为NEW
        this.state = NEW;       // ensure visibility of callable
    }
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

实际上,Callable = Runnable + result,看Executors.callable(runnable,result)的实现:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        //new了一个RunnableAdapter,返回Callable,说明RunnableAdapter实现了Callable
        return new RunnableAdapter<T>(task, result);
    }

利用RunnableAdapter适配器实现了将Runnable转为Callable,继续看RunnableAdapter类:

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            //Runnable task执行了run()
            task.run();
            //返回了T result
            return result;
        }
    }

本质上还是Runnable去执行run方法,只是增加了result常量来接受返回的结果而已。


剑指JUC原理-13.线程池(中):https://developer.aliyun.com/article/1413642

目录
相关文章
|
6天前
|
缓存 安全 Java
JUC系列之《CountDownLatch:同步多线程的精准发令枪 》
CountDownLatch是Java并发编程中用于线程协调的同步工具,通过计数器实现等待机制。主线程等待多个工作线程完成任务后再继续执行,适用于资源初始化、高并发模拟等场景,具有高效、灵活、线程安全的特点,是JUC包中实用的核心组件之一。
|
5天前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
34 2
|
5天前
|
Java 测试技术 API
【JUC】(1)带你重新认识进程与线程!!让你深层次了解线程运行的睡眠与打断!!
JUC是什么?你可以说它就是研究Java方面的并发过程。本篇是JUC专栏的第一章!带你了解并行与并发、线程与程序、线程的启动与休眠、打断和等待!全是干货!快快快!
140 2
|
5天前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
43 1
|
5月前
|
存储 缓存 安全
JUC并发—11.线程池源码分析
本文主要介绍了线程池的优势和JUC提供的线程池、ThreadPoolExecutor和Excutors创建的线程池、如何设计一个线程池、ThreadPoolExecutor线程池的执行流程、ThreadPoolExecutor的源码分析、如何合理设置线程池参数 + 定制线程池。
JUC并发—11.线程池源码分析
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
114 1
|
8月前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
9月前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
186 6
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
1299 30
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)

热门文章

最新文章