Executor

简介: Executor体系java中,new一个线程对象是耗费资源的操作,对于需要大量线程创建的场景可以使用线程池来解决。使用线程池不仅能够降低创建和销毁线程的性能开销,如果合理的设置线程池还能够避免无限制的创建线程资源,保持系统稳定。

Executor体系

java中,new一个线程对象是耗费资源的操作,对于需要大量线程创建的场景可以使用线程池来解决。

使用线程池不仅能够降低创建和销毁线程的性能开销,如果合理的设置线程池还能够避免无限制的创建线程资源,保持系统稳定。

jdk中内置了Executor框架,可以用于实现线程池,大体结构如下:
image

Executor

Executor接口只定义了一个execute方法,可以用于执行一个Runnable类型对象:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     */
    void execute(Runnable command);
}

ExecutorService

ExecutorService扩展了Executor接口,提供了更丰富的方法:

public interface ExecutorService extends Executor {

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     */
    void shutdown();

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's {@code get} method will return the task's result upon
     * successful completion.
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return the given result upon successful completion.
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     */
    Future<?> submit(Runnable task);
}

submit和execute的区别

执行一个任务,可以使用submit和execute,这两者有什么区别呢?

1. execute只能接受Runnable类型的任务;

2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

submit和execute

submit和execute两者的区别:

1. execute只能接受Runnable类型的任务;

2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。

ThreadPoolExecutor

ThreadPoolExecutor表示一个线程池,ThreadPoolExecutor实现了Executor接口,任何Runnable类型的线程都可以被ThreadPoolExecutor线程池调度。

jdk还提供了Executors类用于便捷的创建线程池,Executors相当于线程池工厂,通过Executors可以获得拥有特定功能的线程池,其主要API如下:

public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue. 
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue.
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

API简析:

newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池空闲则立即执行;若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。

newSingleThreadExecutor:创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。

newCachedThreadPool:返回一个corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE的线程池,因为corePoolSize为0,所以所有线程在空闲60s后就会被回收。
    线程池的线程数量不确定,但若有空闲线程则直接复用;如果所有线程都在工作,并且此时又有新的任务提交,则会创建新的线程处理任务,并且每一个空闲线程会在超时后后自动回收。

newSingleThreadScheduledThreadPool:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService智商扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。

newScheduledThreadPool:该方法也会返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。

线程池内部实现

Executors的如下方法,实质上都是对ThreadPoolExecutor的封装:

newFixedThreadPool(int nThreads)
newSingleThreadExecutor()
newCachedThreadPool()

ThreadPoolExecutor的构造方法如下:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);

参数含义:

corePoolSize:指定了线程池中的线程数量;

maximumPoolSize:指定了线程池中的最大线程数量;

keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的线程能够存活的时间,超过时间则被销毁;

unit:keepAliveTime的单位;

workQueue:任务队列,用于保存被提交但尚未被执行的任务;

threadFactory:线程工厂,用于创建线程,一般用默认的即可;

handler:拒绝策略,当任务太多来不及处理时,如何拒绝任务。

任务队列

workQueue接收的类型为BlockingQueue的阻塞队列,且只能存放Runnable类型对象。ThreadPoolExecutor中可能用到的阻塞队列如下。

SynchronousQueue

SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每一个插入操作都要等待一个相应的删除操作。提交的任务不会真实地保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则创建新的线程,如果线程数达到最大值,则执行拒绝策略。

Executors.newCachedThreadPool()返回的线程池就是使用的这种队列。

ArrayBlockingQueue

这是一个有界的阻塞队列,数组实现,其构造函数如下:

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

当线程池的线程数小于corePoolSize,则优先创建新的线程,否则将任务加入等待队列。若等待队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务;若大于maximumPoolSize,则执行拒绝策略。

因此,有界队列仅在队列装满时,才可能将线程数量提高到corePoolSize以上,除非系统非常繁忙,否则有界队列能够确保核心线程数维持在corePoolSize。

LinkedBlockingQueue

这个也是有边界的队列,但是是链表实现的,如果初始化的时候不指定边界值则默认是Interger.MAX_VALUE

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

PriorityBlockQueue

带有执行优先级的阻塞队列,可以控制任务执行的先后顺序,没有边界。其根据任务自身的优先级优先级顺序先后执行。

execute

从execute的实际执行过程中,可以观察到corePoolSize和maximumPoolSize的具体应用:

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

workerCountOf(c)获取了当前线程的线程总数,当线程总数小于corePoolSize时,会将任务通过addWorker()直接调度执行,否则,通过workQueue.offer()将任务加入任务队列中等待。

如果加入队列失败(例如有界队列达到上限或使用了SynchronousQueue),则将任务直接提交给线程池,如果当前线程已经达到maximumPoolSize,则提交失败,执行拒绝策略。

其执行流程示意图如下:
image

拒绝策略

当线程池的线程数达到maximumPoolSize,且任务队列也满了的情况下,此时已超超出了线程池的负载能力,就会使用拒绝策略。jdk中内置了四种拒绝策略,这四种策略均在ThreadPoolExecutor中:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}

AbortPolicy

该策略会直接抛出异常,组织系统正常工作,这是默认策略:

public class ThreadPoolExecutor extends AbstractExecutorService {
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
}

CallerRunsPolicy

该策略会直接在调用者线程中,运行当前被丢弃的任务,这种策略下,任务不会被真正的丢弃,但是会影响任务提交的线程性能。

DiscardOldestPolicy

该策略会丢弃最老的一个请求,也就是即将被执行的那个请求,并尝试再次提交当前任务。

DiscardPolicy

该策略默默地丢弃无法处理的任务,不予任何处理,如果不允许任务丢失,则不能使用这种策略。

自定义策略

内置策略均是实现RejectedExecutionHandler接口:

public interface RejectedExecutionHandler {
    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

如果内置策略不能满足需求,可以选择通过实现RejectedExecutionHandler接口自定义策略:

public class CustomerRejectPoliceDemo {
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + " Thread ID: " + Thread.currentThread().getId());
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0l, 
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), 
                (r, executor) -> System.out.println(r.toString() + " is discard"));
        
        for (int i = 0; i < 30; i++) {
            executorService.submit(task);
        }
    }
}
目录
相关文章
|
4月前
|
缓存 Java
Executor
【7月更文挑战第22天】
76 0
|
6月前
|
并行计算 算法 Java
Java线程池——Executor框架
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)。 Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
|
存储 Java 调度
【 Executor线程池原理与源码】
【 Executor线程池原理与源码】
ThreadPoolExecutor的中的submit和FutureTask || 通过Executors 创建线程池的一些实例(Callable和Runnable的在其中的体现)
ThreadPoolExecutor的中的submit和FutureTask || 通过Executors 创建线程池的一些实例(Callable和Runnable的在其中的体现)
203 0
|
设计模式 SQL 缓存
Executor接口|学习笔记
快速学习Executor接口
Executor接口|学习笔记
|
Java API
Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战
使用 executor 线程池时经常用到 shutdown / shutdownNow + awaitTermination 方法关闭线程池,下面看下几种方法的定义与常见用法。
1179 0
Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战
|
存储 缓存 资源调度
Executor框架及线程池总结
Executor作为一个灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程进行了解耦开发,基于生产者和消费者模型,还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能检测等机制。
Executor框架及线程池总结
|
Java 程序员 调度
Executor框架
Executor框架
164 0
Executor框架