Executor

简介: Executor体系java中,new一个线程对象是耗费资源的操作,对于需要大量线程创建的场景可以使用线程池来解决。使用线程池不仅能够降低创建和销毁线程的性能开销,如果合理的设置线程池还能够避免无限制的创建线程资源,保持系统稳定。

Executor体系

java中,new一个线程对象是耗费资源的操作,对于需要大量线程创建的场景可以使用线程池来解决。

使用线程池不仅能够降低创建和销毁线程的性能开销,如果合理的设置线程池还能够避免无限制的创建线程资源,保持系统稳定。

jdk中内置了Executor框架,可以用于实现线程池,大体结构如下:
image

Executor

Executor接口只定义了一个execute方法,可以用于执行一个Runnable类型对象:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     */
    void execute(Runnable command);
}

ExecutorService

ExecutorService扩展了Executor接口,提供了更丰富的方法:

public interface ExecutorService extends Executor {

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     */
    void shutdown();

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's {@code get} method will return the task's result upon
     * successful completion.
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return the given result upon successful completion.
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     */
    Future<?> submit(Runnable task);
}

submit和execute的区别

执行一个任务,可以使用submit和execute,这两者有什么区别呢?

1. execute只能接受Runnable类型的任务;

2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

submit和execute

submit和execute两者的区别:

1. execute只能接受Runnable类型的任务;

2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。

ThreadPoolExecutor

ThreadPoolExecutor表示一个线程池,ThreadPoolExecutor实现了Executor接口,任何Runnable类型的线程都可以被ThreadPoolExecutor线程池调度。

jdk还提供了Executors类用于便捷的创建线程池,Executors相当于线程池工厂,通过Executors可以获得拥有特定功能的线程池,其主要API如下:

public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue. 
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue.
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

API简析:

newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池空闲则立即执行;若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。

newSingleThreadExecutor:创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。

newCachedThreadPool:返回一个corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE的线程池,因为corePoolSize为0,所以所有线程在空闲60s后就会被回收。
    线程池的线程数量不确定,但若有空闲线程则直接复用;如果所有线程都在工作,并且此时又有新的任务提交,则会创建新的线程处理任务,并且每一个空闲线程会在超时后后自动回收。

newSingleThreadScheduledThreadPool:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService智商扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。

newScheduledThreadPool:该方法也会返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。

线程池内部实现

Executors的如下方法,实质上都是对ThreadPoolExecutor的封装:

newFixedThreadPool(int nThreads)
newSingleThreadExecutor()
newCachedThreadPool()

ThreadPoolExecutor的构造方法如下:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);

参数含义:

corePoolSize:指定了线程池中的线程数量;

maximumPoolSize:指定了线程池中的最大线程数量;

keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的线程能够存活的时间,超过时间则被销毁;

unit:keepAliveTime的单位;

workQueue:任务队列,用于保存被提交但尚未被执行的任务;

threadFactory:线程工厂,用于创建线程,一般用默认的即可;

handler:拒绝策略,当任务太多来不及处理时,如何拒绝任务。

任务队列

workQueue接收的类型为BlockingQueue的阻塞队列,且只能存放Runnable类型对象。ThreadPoolExecutor中可能用到的阻塞队列如下。

SynchronousQueue

SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每一个插入操作都要等待一个相应的删除操作。提交的任务不会真实地保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则创建新的线程,如果线程数达到最大值,则执行拒绝策略。

Executors.newCachedThreadPool()返回的线程池就是使用的这种队列。

ArrayBlockingQueue

这是一个有界的阻塞队列,数组实现,其构造函数如下:

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

当线程池的线程数小于corePoolSize,则优先创建新的线程,否则将任务加入等待队列。若等待队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务;若大于maximumPoolSize,则执行拒绝策略。

因此,有界队列仅在队列装满时,才可能将线程数量提高到corePoolSize以上,除非系统非常繁忙,否则有界队列能够确保核心线程数维持在corePoolSize。

LinkedBlockingQueue

这个也是有边界的队列,但是是链表实现的,如果初始化的时候不指定边界值则默认是Interger.MAX_VALUE

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

PriorityBlockQueue

带有执行优先级的阻塞队列,可以控制任务执行的先后顺序,没有边界。其根据任务自身的优先级优先级顺序先后执行。

execute

从execute的实际执行过程中,可以观察到corePoolSize和maximumPoolSize的具体应用:

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

workerCountOf(c)获取了当前线程的线程总数,当线程总数小于corePoolSize时,会将任务通过addWorker()直接调度执行,否则,通过workQueue.offer()将任务加入任务队列中等待。

如果加入队列失败(例如有界队列达到上限或使用了SynchronousQueue),则将任务直接提交给线程池,如果当前线程已经达到maximumPoolSize,则提交失败,执行拒绝策略。

其执行流程示意图如下:
image

拒绝策略

当线程池的线程数达到maximumPoolSize,且任务队列也满了的情况下,此时已超超出了线程池的负载能力,就会使用拒绝策略。jdk中内置了四种拒绝策略,这四种策略均在ThreadPoolExecutor中:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}

AbortPolicy

该策略会直接抛出异常,组织系统正常工作,这是默认策略:

public class ThreadPoolExecutor extends AbstractExecutorService {
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
}

CallerRunsPolicy

该策略会直接在调用者线程中,运行当前被丢弃的任务,这种策略下,任务不会被真正的丢弃,但是会影响任务提交的线程性能。

DiscardOldestPolicy

该策略会丢弃最老的一个请求,也就是即将被执行的那个请求,并尝试再次提交当前任务。

DiscardPolicy

该策略默默地丢弃无法处理的任务,不予任何处理,如果不允许任务丢失,则不能使用这种策略。

自定义策略

内置策略均是实现RejectedExecutionHandler接口:

public interface RejectedExecutionHandler {
    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

如果内置策略不能满足需求,可以选择通过实现RejectedExecutionHandler接口自定义策略:

public class CustomerRejectPoliceDemo {
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + " Thread ID: " + Thread.currentThread().getId());
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0l, 
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), 
                (r, executor) -> System.out.println(r.toString() + " is discard"));
        
        for (int i = 0; i < 30; i++) {
            executorService.submit(task);
        }
    }
}
目录
相关文章
|
Oracle 关系型数据库 MySQL
OceanBase实践入门:高可用原理和容灾方案
OceanBase的多副本(奇数)设计,以及使用Paxos协议同步事务日志,是OceanBase高可用能做到自动切换(RTO约20s)和不丢数据(RPO=0)的关键。OceanBase在这个设计上还衍生出很多特性:如负载均衡和异地多活等。
6021 0
|
消息中间件 Java Kafka
Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka
  一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
2960 0
|
消息中间件 缓存 运维
|
物联网 开发工具 C++
AliOS Things 的 ESP32 应用开发流程
本文介绍 Windows 下基于 AliOS Things 的 ESP32 应用开发流程,包括环境搭建、程序编译、固件烧写。
10390 5
|
机器学习/深度学习 Apache C++
MXNet简介
轻量级,便携式,灵活的分布式/移动深度学习,具有动态,突变感知的数据流 Dep 调度程序; 适用于Python,R,Julia,Scala,Go,Javascript等,详情请参考:https://mxnet.apache.org GitHub地址:https://github.com/apache/incubator-mxnet Apache MXNet(孵化)是一个深度学习框架,旨在提高效率和灵活性。
2571 0
|
小程序
用小程序1分钟完成专利缴费,竟然这么简单!
用小程序1分钟完成专利缴费,竟然这么简单!
852 0
用小程序1分钟完成专利缴费,竟然这么简单!
|
弹性计算 安全 关系型数据库
阿里云飞天会员是什么?企业用户加入飞天会员有什么好处?
目前阿里云飞天会员是很多企业用户比较关心的,不少企业用户都加入了这个会员,那么这个飞天会员是什么?究竟可以享受哪些优惠和服务呢?下面小编全方位介绍下阿里云的飞天会员相关政策和服务。
1406 0
阿里云飞天会员是什么?企业用户加入飞天会员有什么好处?
阿里云域名注册流程包括域名查询和实名认证教程
阿里云域名注册包括域名查询、域名信息模板创建、域名实名认证等流程
2560 0
阿里云域名注册流程包括域名查询和实名认证教程
|
SQL 算法 API
Flink 流批一体的实践与探索
作为 Dataflow 模型的最早采用者之一,Apache Flink 在流批一体特性的完成度上在开源项目中是十分领先的。本文将基于社区资料和笔者的经验,介绍 Flink 目前(1.10)流批一体的现状以及未来的发展规划。