java多线程系列:Executors框架

简介: 目录Executor接口介绍ExecutorService常用接口介绍创建线程池的一些方法介绍3.1 newFixedThreadPool方法3.2 newCachedThreadPool方法3.

目录

  1. Executor接口介绍
  2. ExecutorService常用接口介绍
  3. 创建线程池的一些方法介绍
  4. 疑问解答

Executor接口介绍

Executor是一个接口,里面提供了一个execute方法,该方法接收一个Runable参数,如下

public interface Executor {
    void execute(Runnable command);
}

Executor框架的常用类和接口结构图

Executor框架的常用类和接口结构图

线程对象及线程执行返回的对象

线程对象及线程执行返回的对象

线程对象

线程对象就是提交给线程池的任务,可以实现Runable接口或Callable接口。或许这边会产生一个疑问,为什么Runable接口和Callable接口没有任何关联,却都能作为任务来执行?大家可以思考下,文章的结尾会对此进行说明

Future接口

Future接口和FutureTask类是用来接收线程异步执行后返回的结果,可以看到下方ExecutorService接口的submit方法返回的就是Future。

ExecutorService常用接口介绍

接下来我们来看看继承了Executor接口的ExecutorService

public interface ExecutorService extends Executor {
    //正常关闭(不再接收新任务,执行完队列中的任务)
    void shutdown();
    //强行关闭(关闭当前正在执行的任务,返回所有尚未启动的任务清单)
    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
    ...
}

ThreadPoolExecutor构造函数介绍

在介绍穿件线程池的方法之前要先介绍一个类ThreadPoolExecutor,应为Executors工厂大部分方法都是返回ThreadPoolExecutor对象,先来看看它的构造函数吧

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {...}

参数介绍

参数 类型 含义
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 存活时间
unit TimeUnit 时间单位
workQueue BlockingQueue 存放线程的队列
threadFactory ThreadFactory 创建线程的工厂
handler RejectedExecutionHandler 多余的的线程处理器(拒绝策略)

创建线程池的一些方法介绍

为什么要讲ExecutorService接口呢?是因为我们使用Executors的方法时返回的大部分都是ExecutorService。
Executors提供了几个创建线程池方法,接下来我就介绍一下这些方法

newFixedThreadPool(int nThreads)
创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。

newWorkStealingPool()
创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。

newSingleThreadExecutor()
该方法返回一个固定数量的线程池  
该方法的线程始终不变,当有一个任务提交时,若线程池空闲,则立即执行,若没有,则会被暂缓在一个任务队列只能怪等待有空闲的线程去执行。

newCachedThreadPool() 
返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程,并且每一个空闲线程会在60秒后自动回收。

newScheduledThreadPool(int corePoolSize)
返回一个SchededExecutorService对象,但该线程池可以设置线程的数量,支持定时及周期性任务执行。
 
newSingleThreadScheduledExecutor()
创建一个单例线程池,定期或延时执行任务。  
 

下面讲解下几个常用的方法,创建单个的就不说明了

newFixedThreadPool方法

该方法创建指定线程数量的线程池,没有限制可存放的线程数量(无界队列),适用于线程任务执行较快的场景。

FixedThreadPool的execute()的运行示意图

看看Executors工厂内部是如何实现的

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

可以看到返回的是一个ThreadPoolExecutor对象,核心线程数和是最大线程数都是传入的参数,存活时间是0,时间单位是毫秒,阻塞队列是无界队列LinkedBlockingQueue。

由于队列采用的是无界队列LinkedBlockingQueue,最大线程数maximumPoolSize和keepAliveTime都是无效参数,拒绝策略也将无效,为什么?

这里又延伸出一个问题,无界队列说明任务没有上限,如果执行的任务比较耗时,那么新的任务会一直存放在线程池中,线程池的任务会越来越多,将会导致什么后果?下面的代码可以试试

public class Main {

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        while (true){
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    }
}

示例代码

public class Main {

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    System.out.println("任务"+ finalI +":开始等待2秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                    Thread.sleep(2000);
                    System.out.println("任务"+ finalI +":结束等待2秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

        }
        pool.shutdown();
    }
}

输出结果

任务4:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-4
任务2:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-2
任务3:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-3
任务1:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-1

任务2:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-2
任务3:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-3
任务1:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-1
任务4:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-4
任务6:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-4
任务7:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-1
任务5:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-3
任务8:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-2

任务5:结束等待2秒,时间:17:13:26.050,当前线程名:pool-1-thread-3
任务7:结束等待2秒,时间:17:13:26.050,当前线程名:pool-1-thread-1
任务8:结束等待2秒,时间:17:13:26.051,当前线程名:pool-1-thread-2
任务6:结束等待2秒,时间:17:13:26.050,当前线程名:pool-1-thread-4

可以看出任务1-4在同一时间执行,在2秒后执行完毕,同时开始执行任务5-8。说明方法内部只创建了4个线程,其他任务存放在队列中等待执行。

newCachedThreadPool方法

newCachedThreadPool方法创建的线程池会根据需要自动创建新线程。

CachedThreadPool的execute()的运行示意图

看看Executors工厂内部是如何实现的

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool方法也是返回ThreadPoolExecutor对象,核心线程是0,最大线程数是Integer的最MAX_VALUE,存活时间是60,时间单位是秒,SynchronousQueue队列。

从传入的参数可以得知,在newCachedThreadPool方法中的空闲线程存活时间时60秒,一旦超过60秒线程就会被终止。这边还隐含了一个问题,如果执行的线程较慢,而提交任务的速度快于线程执行的速度,那么就会不断的创建新的线程,从而导致cpu和内存的增长。

代码和newFixedThreadPool一样循环添加新的线程任务,我的电脑运行就会出现如下错误

An unrecoverable stack overflow has occurred.

Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:714)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.learnConcurrency.executor.cachedThreadPool.Main.main(Main.java:11)
Process finished with exit code -1073741571 (0xC00000FD)

关于SynchronousQueue队列,它是一个没有容量的阻塞队列,任务传递的示意图如下

CachedThreadPool的任务传递示意图

示例代码

public class Main {
    public static void main(String[] args) throws Exception{
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    System.out.println("任务"+ finalI +":开始等待60秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                    Thread.sleep(60000);
                    System.out.println("任务"+ finalI +":结束等待60秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            //睡眠10秒
            Thread.sleep(10000);
        }
        pool.shutdown();
    }
}

执行结果

任务1:开始等待60秒,时间:17:15:21.570,当前线程名:pool-1-thread-1
任务2:开始等待60秒,时间:17:15:31.553,当前线程名:pool-1-thread-2
任务3:开始等待60秒,时间:17:15:41.555,当前线程名:pool-1-thread-3
任务4:开始等待60秒,时间:17:15:51.554,当前线程名:pool-1-thread-4
任务5:开始等待60秒,时间:17:16:01.554,当前线程名:pool-1-thread-5
任务6:开始等待60秒,时间:17:16:11.555,当前线程名:pool-1-thread-6
任务7:开始等待60秒,时间:17:16:21.555,当前线程名:pool-1-thread-7
任务1:结束等待60秒,时间:17:16:21.570,当前线程名:pool-1-thread-1
任务2:结束等待60秒,时间:17:16:31.554,当前线程名:pool-1-thread-2

任务8:开始等待60秒,时间:17:16:31.556,当前线程名:pool-1-thread-2
任务3:结束等待60秒,时间:17:16:41.555,当前线程名:pool-1-thread-3
任务4:结束等待60秒,时间:17:16:51.556,当前线程名:pool-1-thread-4
任务5:结束等待60秒,时间:17:17:01.556,当前线程名:pool-1-thread-5
任务6:结束等待60秒,时间:17:17:11.555,当前线程名:pool-1-thread-6
任务7:结束等待60秒,时间:17:17:21.556,当前线程名:pool-1-thread-7
任务8:结束等待60秒,时间:17:17:31.557,当前线程名:pool-1-thread-2

示例代码中每个任务都睡眠60秒,每次循环添加任务睡眠10秒,从执行结果来看,添加的7个任务都是由不同的线程来执行,而此时线程1和2都执行完毕,任务8添加进来由之前创建的pool-1-thread-2执行。

newScheduledThreadPool方法

这个线程池主要用来延迟执行任务或者定期执行任务。

看看Executors工厂内部是如何实现的

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

这里返回的是ScheduledThreadPoolExecutor对象,我们继续深入进去看看

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

这里调用的是父类的构造函数,ScheduledThreadPoolExecutor的父类是ThreadPoolExecutor,所以返回的也是ThreadPoolExecutor对象。核心线程数是传入的参数corePoolSize,线程最大值是Integer的MAX_VALUE,存活时间时0,时间单位是纳秒,队列是DelayedWorkQueue。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {}

下面是ScheduledExecutorService的一些方法

public interface ScheduledExecutorService extends ExecutorService {
    //delay延迟时间,unit延迟单位,只执行1次,在经过delay延迟时间之后开始执行
    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    //首次执行时间时然后在initialDelay之后,然后在initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    //首次执行时间时然后在initialDelay之后,然后延迟delay时间执行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

疑问解答

Runable接口和Callable接口

那么就从提交任务入口看看吧

submit方法是由抽象类AbstractExecutorService实现的

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

可以看出将传入的Runnable对象和Callable传入一个newTaskFor方法,然后返回一个RunnableFuture对象

我们再来看看newTaskFor方法

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

这里都是调用FutureTask的构造函数,我们接着往下看

private Callable<V> callable;

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;      
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       
}

FutureTask类中有个成员变量callable,而传入的Runnable对象则继续调用Executors工厂类的callable方法返回一个Callable对象

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
//适配器
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

好了,到这里也就真相大白了,Runnable对象经过一系列的方法调用,最终被RunnableAdapter适配器适配成Callable对象。方法调用图如下

方法调用图

GitHub地址

地址在这

觉得不错的点个star

下一篇会介绍下自定义线程池,后续也会更新newWorkStealingPool方法介绍

参考资料

[1] Java 并发编程的艺术

[2] Java 并发编程实战


作者: 云枭zd
Github: Github地址
出处: https://www.cnblogs.com/fixzd/
版权声明:本文欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则视为侵权。
目录
相关文章
|
6天前
|
存储 安全 Java
Java 集合框架中的老炮与新秀:HashTable 和 HashMap 谁更胜一筹?
嗨,大家好,我是技术伙伴小米。今天通过讲故事的方式,详细介绍 Java 中 HashMap 和 HashTable 的区别。从版本、线程安全、null 值支持、性能及迭代器行为等方面对比,帮助你轻松应对面试中的经典问题。HashMap 更高效灵活,适合单线程或需手动处理线程安全的场景;HashTable 较古老,线程安全但性能不佳。现代项目推荐使用 ConcurrentHashMap。关注我的公众号“软件求生”,获取更多技术干货!
28 3
|
11天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
13天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
13天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
13天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
35 3
|
13天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
93 2
|
21天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
46 6
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####