一文搞懂Executor执行器和线程池的关系,整体介绍其任务执行/调度体系:ThreadPoolExecutor、ScheduledExecutorService(中)

简介: 一文搞懂Executor执行器和线程池的关系,整体介绍其任务执行/调度体系:ThreadPoolExecutor、ScheduledExecutorService(中)

AbstractExecutorService


ExecutorService的抽象实现:它实现了接口的部分方法,但是它并没有存放任务或者线程的数组或者Collection,也就是说它依旧和线程池没有半毛钱关系


// @since 1.5
public abstract class AbstractExecutorService implements ExecutorService {
  @Override
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    ... // 其它submit方法原理一样,底层执行调用的均是Executor#execute方法
}


关于invokeAll()/invokeAny()等方法的执行源码此处就不铺开了,记住结论即可:批量执行时使用特别方便(注意全部成功or任意一个成功的区别)。


手写实现AbstractExecutorService


本文以一个非常简单手写实例来告诉你任务执行器的效果,进一步让你感受到它目前还和线程池木有半毛钱关系。


private static class MyExecutorService extends AbstractExecutorService {
    @Override
    public void shutdown() {
        System.out.println("关闭执行器,释放资源");
    }
    @Override
    public List<Runnable> shutdownNow() {
        System.out.println("立刻关闭执行器,释放资源");
        return Collections.emptyList();
    }
    @Override
    public boolean isShutdown() {
        return false;
    }
    @Override
    public boolean isTerminated() {
        return false;
    }
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }
    // 执行任务(本处使用异步执行)
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}


准备一个方法,用于产生任务


// period:任务执行耗时 单位s
private Runnable createTask(int period) {
    return () -> {
        try {
            TimeUnit.SECONDS.sleep(period);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String currThreadName = Thread.currentThread().getName();
        System.out.println("线程[" + currThreadName + "] 我是异步执行的,耗时" + period + "s");
    };
}


书写测试方法:


@Test
public void fun3() throws InterruptedException, ExecutionException {
    String mainThreadName = Thread.currentThread().getName();
    System.out.println("----------主线程[" + mainThreadName + "]开始----------");
    ExecutorService executorService = new MyExecutorService();
    Instant start = Instant.now();
    Future<?> submit = executorService.submit(createTask(3));
    System.out.println("结果为:" + submit.get());
    Instant end = Instant.now();
    System.out.println("总耗时为:" + Duration.between(start, end).getSeconds());
    executorService.shutdown();
}


运行程序,打印:


----------主线程[main]开始----------
线程[Thread-0] 我是异步执行的,耗时3s
结果为:null
总耗时为:0
关闭执行器,释放资源


下面仅需改一下,把get放在上面:

System.out.println("结果为:" + submit.get());
Instant end = Instant.now();
System.out.println("总耗时为:" + Duration.between(start, end).getSeconds());


再次运行打印:


----------主线程[main]开始----------
线程[Thread-0] 我是异步执行的,耗时3s
结果为:null
总耗时为:3
关闭执行器,释放资源


另外,关于其它submit方法,以及批量执行的invokeAll/invokeAny方法,各位可自行测试哈。下面介绍JDK自带的,Doug Lea大神给我们提供的实现类,也是最最最最最为重要的一个类:ThreadPoolExecutor。


ThreadPoolExecutor 带线程池的执行器


顾名思义,它是一个内置线程池的执行器,也就是说:它会把Runnable任务的执行均扔进线程池里面进行执行,效率最高。


注意:ThreadPoolTaskExecutor它是Spirng提供的,基于ThreadPoolExecutor进行包装实现,请勿弄混了。


本文并不会解释为何需要线程池,以及构建线程池的七大参数都是什么意思,而只会站在使用以及基础原理的角度做出示例和说明。


public class ThreadPoolExecutor extends AbstractExecutorService {
  // 核心线程数
  private volatile int corePoolSize;
  // 最大线程数
  private volatile int maximumPoolSize;
  // 任务队列(当任务太多了,就放在这里排队)
  private final BlockingQueue<Runnable> workQueue;
  // 空闲线程的超时时间,超时就回收它(非core线程)
  private volatile long keepAliveTime;
  // 不解释
  private volatile ThreadFactory threadFactory;
  // 线程池拒绝处理函数(如任务太多了,如何拒绝)
  // 默认策略是:AbortPolicy。要决绝是抛出异常:RejectedExecutionException
  private volatile RejectedExecutionHandler handler;
}


通过上里手工模拟可知道最重要的是对execute()方法的实现:它决定了你最终如何去执行任务(同步or异步?用老的线程还是用新的线程等等)。


相关文章
|
2月前
|
算法 Unix Linux
linux线程调度策略
linux线程调度策略
55 0
|
1月前
|
存储 Java 数据处理
进程中的线程调度
进程是应用程序运行的基本单位,包括主线程、用户线程和守护线程。计算机由存储器和处理器协同操作,操作系统设计为分时和分任务模式。在个人PC普及后,基于用户的时间片异步任务操作系统确保了更好的体验和性能。线程作为进程的调度单元,通过覆写`Thread`类的`run`方法来处理任务数据,并由系统调度框架统一管理。微服务架构进一步将应用分解为多个子服务,在不同节点上执行,提高数据处理效率与容错性,特别是在大规模数据存储和处理中表现显著。例如,利用微服务框架可以优化算法,加速业务逻辑处理,并在不同区块间分配海量数据存储任务。
|
2月前
|
存储 监控 Java
|
2月前
|
监控 Java
ThreadPoolExecutor 线程执行超时,释放线程
ThreadPoolExecutor 线程执行超时,释放线程
64 1
|
2月前
|
前端开发 JavaScript 大数据
React与Web Workers:开启前端多线程时代的钥匙——深入探索计算密集型任务的优化策略与最佳实践
【8月更文挑战第31天】随着Web应用复杂性的提升,单线程JavaScript已难以胜任高计算量任务。Web Workers通过多线程编程解决了这一问题,使耗时任务独立运行而不阻塞主线程。结合React的组件化与虚拟DOM优势,可将大数据处理等任务交由Web Workers完成,确保UI流畅。最佳实践包括定义清晰接口、加强错误处理及合理评估任务特性。这一结合不仅提升了用户体验,更为前端开发带来多线程时代的全新可能。
31 0
|
2月前
|
Cloud Native Java 调度
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决
|
2月前
|
Java 测试技术 PHP
父子任务使用不当线程池死锁怎么解决?
在Java多线程编程中,线程池有助于提升性能与资源利用效率,但若父子任务共用同一池,则可能诱发死锁。本文通过一个具体案例剖析此问题:在一个固定大小为2的线程池中,父任务直接调用`outerTask`,而`outerTask`再次使用同一线程池异步调用`innerTask`。理论上,任务应迅速完成,但实际上却超时未完成。经由`jstack`输出的线程调用栈分析发现,线程陷入等待状态,形成“死锁”。原因是子任务需待父任务完成,而父任务则需等待子任务执行完毕以释放线程,从而相互阻塞。此问题在测试环境中不易显现,常在生产环境下高并发时爆发,重启或扩容仅能暂时缓解。
|
3月前
|
Java Linux
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
|
3月前
|
安全
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决
|
3月前
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决