【多线程:线程池】ThreadPoolExecutor类-提交、停止

简介: 【多线程:线程池】ThreadPoolExecutor类-提交、停止

【多线程:线程池】ThreadPoolExecutor类-提交、停止

01.提交任务

// 执行任务
    void execute(Runnable command);
    
    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);
    
    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
     throws InterruptedException, ExecutionException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;

submit方法

submit方法与execute方法的区别在于,execute方法接收的参数是Runnable类型的参数 没有返回值,而submit方法接收的是Callable类型的参数有返回值 且 返回值用Future<>接收,Future<>获取线程的返回值 实现原理 就是之前所讲的保护性暂停模式。
例子

@Slf4j(topic = "c.TestPool1")
public class TestPool1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<String> future = threadpool.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                log.debug("running");
                Thread.sleep(1000);
                return "ok";
            }

        });
        log.debug("{}",future.get());
    }
}

结果

17:12:30.718 c.TestPool1 [pool-1-thread-1] - running
17:12:31.731 c.TestPool1 [main] - ok

解释
可以看出我们实现了Callable接口 并返回了字符串"ok" 给Future< String>,之后通过get方法获取到返回数据。

invokeAll方法

invokeAll方法接收的是一个任务集合 且有返回值,线程池中的线程执行这个任务集合。
例子

@Slf4j(topic = "c.TestPool2")
public class TestPool2 {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    return "3";
                }
        ));

        futures.forEach(f -> {
            try {
                log.debug("{}",f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

结果

17:48:49.429 c.TestPool2 [pool-1-thread-2] - begin2
17:48:49.429 c.TestPool2 [pool-1-thread-1] - begin1
17:48:49.930 c.TestPool2 [pool-1-thread-2] - begin3
17:48:51.937 c.TestPool2 [main] - 1
17:48:51.937 c.TestPool2 [main] - 2
17:48:51.937 c.TestPool2 [main] - 3

解释
我们可以看到 任务1与任务2同时被执行,但是因为我们的线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后 线程2执行任务3,中间差了0.5s 是因为 任务2执行了0.5s,最终我们遍历返回结果 并打印。

invokeAny方法

invokeAny方法的作用是得到一个最先返回的结果,之后的线程就不再运行。
例子

@Slf4j(topic = "c.TestPool3")
public class TestPool3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

       String future = threadpool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    log.debug("end1");
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    log.debug("end2");
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    log.debug("end3");
                    return "3";
                }
        ));

        log.debug("{}",future);
    }
}

结果

18:03:18.892 c.TestPool3 [pool-1-thread-3] - begin3
18:03:18.892 c.TestPool3 [pool-1-thread-2] - begin2
18:03:18.892 c.TestPool3 [pool-1-thread-1] - begin1
18:03:19.412 c.TestPool3 [pool-1-thread-2] - end2
18:03:19.412 c.TestPool3 [main] - 2

解释
注意我们此时把核心线程数 设置为3,此时三个任务同时运行,且任务2先运行完 可以看到最后的结果只有任务2的返回值,但是注意一个场景,假如我们现在的核心线程数只有一个 那么很明显我们的线程会先运行任务1 剩下两个任务加入任务队列,等待任务1运行完才会 有空闲线程运行剩下的任务,也就是说任务1是先运行完的 所以返回的结果只有任务1的返回值。

02.关闭线程池

/*
线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完
    - 此方法不会阻塞调用线程的执行
*/
void shutdown();
// 源码
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(SHUTDOWN);
        // 仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
}

/*
线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
// 源码
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(STOP);
        // 打断所有线程
        interruptWorkers();
        // 获取队列中剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 尝试终结
    tryTerminate();
    return tasks;
}


// 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

shutdown与shutdownNow例子

@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<Integer> result1 = threadpool.submit(() -> {
            log.debug("task 1 running...");
            Thread.sleep(1000);
            log.debug("task 1 finish...");
            return 1;
        });

        Future<Integer> result2 = threadpool.submit(() -> {
            log.debug("task 2 running...");
            Thread.sleep(1000);
            log.debug("task 2 finish...");
            return 2;
        });

        Future<Integer> result3 = threadpool.submit(() -> {
            log.debug("task 3 running...");
            Thread.sleep(1000);
            log.debug("task 3 finish...");
            return 3;
        });

        // shutdown() 部分的代码
        log.debug("shutdown");
        threadpool.shutdown();
        threadpool.submit(()->{
            log.debug("task 4 running...");
            Thread.sleep(1000);
            log.debug("task 4 finish");
            return "4";
        });
        threadpool.awaitTermination(3, TimeUnit.SECONDS);
        log.debug("other...");

        // shutdownNow部分代码
        // log.debug("shutdownNow");
//        List<Runnable> runnables = threadpool.shutdownNow();
//        log.debug("other.... {}" , runnables);
    }
}

shutdown代码结果

18:35:02.128 c.TestShutDown [main] - shutdown
18:35:02.128 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:35:02.128 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 1 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-2] - task 2 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 3 running...
18:35:04.139 c.TestShutDown [pool-1-thread-1] - task 3 finish...
18:35:04.139 c.TestShutDown [main] - other...

解释
可以看出我们在shutdown后依然可以把 shutdown之前的 任务运行完毕,但是shutdown之后的任务就没有再运行了。另外我们关注一下awaitTermination方法 它的作用是等待shutdown部分的任务运行完后 主线程再运行awaitTermination方法之后的代码。

shutdownNow代码结果

18:47:52.344 c.TestShutDown [main] - shutdownNow
18:47:52.344 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:47:52.344 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:47:52.354 c.TestShutDown [main] - other.... [java.util.concurrent.FutureTask@e580929]

解释
可以看出我们在shutdownNow之后 只有一个任务运行成功了,也就是别的任务都已经被打断了。

目录
相关文章
|
11天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
34 1
|
8天前
|
Java
【JavaEE】——多线程常用类
Callable的call方法,FutureTask类,ReentrantLock可重入锁和对比,Semaphore信号量(PV操作)CountDownLatch锁存器,
|
8天前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
8天前
|
安全 Java API
【JavaEE】多线程编程引入——认识Thread类
Thread类,Thread中的run方法,在编程中怎么调度多线程
|
2月前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
61 4
|
2月前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
2月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
94 2
|
2月前
|
数据采集 Java Python
爬取小说资源的Python实践:从单线程到多线程的效率飞跃
本文介绍了一种使用Python从笔趣阁网站爬取小说内容的方法,并通过引入多线程技术大幅提高了下载效率。文章首先概述了环境准备,包括所需安装的库,然后详细描述了爬虫程序的设计与实现过程,包括发送HTTP请求、解析HTML文档、提取章节链接及多线程下载等步骤。最后,强调了性能优化的重要性,并提醒读者遵守相关法律法规。
69 0
|
3月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
62 1
|
3月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
40 3