【多线程:线程池】ThreadPoolExecutor类-提交、停止

简介: 【多线程:线程池】ThreadPoolExecutor类-提交、停止

【多线程:线程池】ThreadPoolExecutor类-提交、停止

01.提交任务

// 执行任务
    void execute(Runnable command);
    
    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);
    
    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
     throws InterruptedException, ExecutionException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;

submit方法

submit方法与execute方法的区别在于,execute方法接收的参数是Runnable类型的参数 没有返回值,而submit方法接收的是Callable类型的参数有返回值 且 返回值用Future<>接收,Future<>获取线程的返回值 实现原理 就是之前所讲的保护性暂停模式。
例子

@Slf4j(topic = "c.TestPool1")
public class TestPool1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<String> future = threadpool.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                log.debug("running");
                Thread.sleep(1000);
                return "ok";
            }

        });
        log.debug("{}",future.get());
    }
}

结果

17:12:30.718 c.TestPool1 [pool-1-thread-1] - running
17:12:31.731 c.TestPool1 [main] - ok

解释
可以看出我们实现了Callable接口 并返回了字符串"ok" 给Future< String>,之后通过get方法获取到返回数据。

invokeAll方法

invokeAll方法接收的是一个任务集合 且有返回值,线程池中的线程执行这个任务集合。
例子

@Slf4j(topic = "c.TestPool2")
public class TestPool2 {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    return "3";
                }
        ));

        futures.forEach(f -> {
            try {
                log.debug("{}",f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

结果

17:48:49.429 c.TestPool2 [pool-1-thread-2] - begin2
17:48:49.429 c.TestPool2 [pool-1-thread-1] - begin1
17:48:49.930 c.TestPool2 [pool-1-thread-2] - begin3
17:48:51.937 c.TestPool2 [main] - 1
17:48:51.937 c.TestPool2 [main] - 2
17:48:51.937 c.TestPool2 [main] - 3

解释
我们可以看到 任务1与任务2同时被执行,但是因为我们的线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后 线程2执行任务3,中间差了0.5s 是因为 任务2执行了0.5s,最终我们遍历返回结果 并打印。

invokeAny方法

invokeAny方法的作用是得到一个最先返回的结果,之后的线程就不再运行。
例子

@Slf4j(topic = "c.TestPool3")
public class TestPool3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

       String future = threadpool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    log.debug("end1");
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    log.debug("end2");
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    log.debug("end3");
                    return "3";
                }
        ));

        log.debug("{}",future);
    }
}

结果

18:03:18.892 c.TestPool3 [pool-1-thread-3] - begin3
18:03:18.892 c.TestPool3 [pool-1-thread-2] - begin2
18:03:18.892 c.TestPool3 [pool-1-thread-1] - begin1
18:03:19.412 c.TestPool3 [pool-1-thread-2] - end2
18:03:19.412 c.TestPool3 [main] - 2

解释
注意我们此时把核心线程数 设置为3,此时三个任务同时运行,且任务2先运行完 可以看到最后的结果只有任务2的返回值,但是注意一个场景,假如我们现在的核心线程数只有一个 那么很明显我们的线程会先运行任务1 剩下两个任务加入任务队列,等待任务1运行完才会 有空闲线程运行剩下的任务,也就是说任务1是先运行完的 所以返回的结果只有任务1的返回值。

02.关闭线程池

/*
线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完
    - 此方法不会阻塞调用线程的执行
*/
void shutdown();
// 源码
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(SHUTDOWN);
        // 仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
}

/*
线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
// 源码
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(STOP);
        // 打断所有线程
        interruptWorkers();
        // 获取队列中剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 尝试终结
    tryTerminate();
    return tasks;
}


// 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

shutdown与shutdownNow例子

@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<Integer> result1 = threadpool.submit(() -> {
            log.debug("task 1 running...");
            Thread.sleep(1000);
            log.debug("task 1 finish...");
            return 1;
        });

        Future<Integer> result2 = threadpool.submit(() -> {
            log.debug("task 2 running...");
            Thread.sleep(1000);
            log.debug("task 2 finish...");
            return 2;
        });

        Future<Integer> result3 = threadpool.submit(() -> {
            log.debug("task 3 running...");
            Thread.sleep(1000);
            log.debug("task 3 finish...");
            return 3;
        });

        // shutdown() 部分的代码
        log.debug("shutdown");
        threadpool.shutdown();
        threadpool.submit(()->{
            log.debug("task 4 running...");
            Thread.sleep(1000);
            log.debug("task 4 finish");
            return "4";
        });
        threadpool.awaitTermination(3, TimeUnit.SECONDS);
        log.debug("other...");

        // shutdownNow部分代码
        // log.debug("shutdownNow");
//        List<Runnable> runnables = threadpool.shutdownNow();
//        log.debug("other.... {}" , runnables);
    }
}

shutdown代码结果

18:35:02.128 c.TestShutDown [main] - shutdown
18:35:02.128 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:35:02.128 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 1 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-2] - task 2 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 3 running...
18:35:04.139 c.TestShutDown [pool-1-thread-1] - task 3 finish...
18:35:04.139 c.TestShutDown [main] - other...

解释
可以看出我们在shutdown后依然可以把 shutdown之前的 任务运行完毕,但是shutdown之后的任务就没有再运行了。另外我们关注一下awaitTermination方法 它的作用是等待shutdown部分的任务运行完后 主线程再运行awaitTermination方法之后的代码。

shutdownNow代码结果

18:47:52.344 c.TestShutDown [main] - shutdownNow
18:47:52.344 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:47:52.344 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:47:52.354 c.TestShutDown [main] - other.... [java.util.concurrent.FutureTask@e580929]

解释
可以看出我们在shutdownNow之后 只有一个任务运行成功了,也就是别的任务都已经被打断了。

目录
相关文章
|
15天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
26天前
|
存储 缓存 NoSQL
Redis单线程已经很快了6.0引入多线程
Redis单线程已经很快了6.0引入多线程
31 3
|
29天前
|
消息中间件 安全 Linux
线程同步与IPC:单进程多线程环境下的选择与权衡
线程同步与IPC:单进程多线程环境下的选择与权衡
58 0
|
28天前
|
存储 算法 Java
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
72 0
|
15天前
|
存储 安全 Java
java多线程之原子操作类
java多线程之原子操作类
|
17天前
|
Java
Java中的多线程实现:使用Thread类与Runnable接口
【4月更文挑战第8天】本文将详细介绍Java中实现多线程的两种方法:使用Thread类和实现Runnable接口。我们将通过实例代码展示如何创建和管理线程,以及如何处理线程同步问题。最后,我们将比较这两种方法的优缺点,以帮助读者在实际开发中选择合适的多线程实现方式。
23 4
|
19天前
|
Java Spring
springboot单类集中定义线程池
该内容是关于Spring中异步任务的配置和使用步骤。首先,在启动类添加`@EnableAsync`注解开启异步支持。然后,自定义线程池类`EventThreadPool`,设置核心和最大线程数、存活时间等参数。接着,将线程池bean注入到Spring中,如`@Bean(&quot;RewardThreadPool&quot;)`。最后,在需要异步执行的方法上使用`@Async`注解,例如在一个定时任务类中,使用`@Scheduled(cron = &quot;...&quot;)`和`@Async`结合实现异步定时任务。
16 2
|
22天前
|
安全 Java 容器
Java并发编程:实现高效、线程安全的多线程应用
综上所述,Java并发编程需要注意线程安全、可见性、性能等方面的问题。合理使用线程池、同步机制、并发容器等工具,可以实现高效且线程安全的多线程应用。
14 1
|
23天前
|
JavaScript 前端开发
JS 单线程还是多线程,如何显示异步操作
JS 单线程还是多线程,如何显示异步操作
22 2
|
25天前
|
Java 测试技术 Python
Python开启线程和线程池的方法
Python开启线程和线程池的方法
17 0
Python开启线程和线程池的方法