【多线程:线程池】ThreadPoolExecutor类-提交、停止

简介: 【多线程:线程池】ThreadPoolExecutor类-提交、停止

【多线程:线程池】ThreadPoolExecutor类-提交、停止

01.提交任务

// 执行任务
    void execute(Runnable command);
    
    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);
    
    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
     throws InterruptedException, ExecutionException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;

submit方法

submit方法与execute方法的区别在于,execute方法接收的参数是Runnable类型的参数 没有返回值,而submit方法接收的是Callable类型的参数有返回值 且 返回值用Future<>接收,Future<>获取线程的返回值 实现原理 就是之前所讲的保护性暂停模式。
例子

@Slf4j(topic = "c.TestPool1")
public class TestPool1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<String> future = threadpool.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                log.debug("running");
                Thread.sleep(1000);
                return "ok";
            }

        });
        log.debug("{}",future.get());
    }
}

结果

17:12:30.718 c.TestPool1 [pool-1-thread-1] - running
17:12:31.731 c.TestPool1 [main] - ok

解释
可以看出我们实现了Callable接口 并返回了字符串"ok" 给Future< String>,之后通过get方法获取到返回数据。

invokeAll方法

invokeAll方法接收的是一个任务集合 且有返回值,线程池中的线程执行这个任务集合。
例子

@Slf4j(topic = "c.TestPool2")
public class TestPool2 {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    return "3";
                }
        ));

        futures.forEach(f -> {
            try {
                log.debug("{}",f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

结果

17:48:49.429 c.TestPool2 [pool-1-thread-2] - begin2
17:48:49.429 c.TestPool2 [pool-1-thread-1] - begin1
17:48:49.930 c.TestPool2 [pool-1-thread-2] - begin3
17:48:51.937 c.TestPool2 [main] - 1
17:48:51.937 c.TestPool2 [main] - 2
17:48:51.937 c.TestPool2 [main] - 3

解释
我们可以看到 任务1与任务2同时被执行,但是因为我们的线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后 线程2执行任务3,中间差了0.5s 是因为 任务2执行了0.5s,最终我们遍历返回结果 并打印。

invokeAny方法

invokeAny方法的作用是得到一个最先返回的结果,之后的线程就不再运行。
例子

@Slf4j(topic = "c.TestPool3")
public class TestPool3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

       String future = threadpool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    log.debug("end1");
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    log.debug("end2");
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    log.debug("end3");
                    return "3";
                }
        ));

        log.debug("{}",future);
    }
}

结果

18:03:18.892 c.TestPool3 [pool-1-thread-3] - begin3
18:03:18.892 c.TestPool3 [pool-1-thread-2] - begin2
18:03:18.892 c.TestPool3 [pool-1-thread-1] - begin1
18:03:19.412 c.TestPool3 [pool-1-thread-2] - end2
18:03:19.412 c.TestPool3 [main] - 2

解释
注意我们此时把核心线程数 设置为3,此时三个任务同时运行,且任务2先运行完 可以看到最后的结果只有任务2的返回值,但是注意一个场景,假如我们现在的核心线程数只有一个 那么很明显我们的线程会先运行任务1 剩下两个任务加入任务队列,等待任务1运行完才会 有空闲线程运行剩下的任务,也就是说任务1是先运行完的 所以返回的结果只有任务1的返回值。

02.关闭线程池

/*
线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完
    - 此方法不会阻塞调用线程的执行
*/
void shutdown();
// 源码
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(SHUTDOWN);
        // 仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
}

/*
线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
// 源码
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(STOP);
        // 打断所有线程
        interruptWorkers();
        // 获取队列中剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 尝试终结
    tryTerminate();
    return tasks;
}


// 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

shutdown与shutdownNow例子

@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<Integer> result1 = threadpool.submit(() -> {
            log.debug("task 1 running...");
            Thread.sleep(1000);
            log.debug("task 1 finish...");
            return 1;
        });

        Future<Integer> result2 = threadpool.submit(() -> {
            log.debug("task 2 running...");
            Thread.sleep(1000);
            log.debug("task 2 finish...");
            return 2;
        });

        Future<Integer> result3 = threadpool.submit(() -> {
            log.debug("task 3 running...");
            Thread.sleep(1000);
            log.debug("task 3 finish...");
            return 3;
        });

        // shutdown() 部分的代码
        log.debug("shutdown");
        threadpool.shutdown();
        threadpool.submit(()->{
            log.debug("task 4 running...");
            Thread.sleep(1000);
            log.debug("task 4 finish");
            return "4";
        });
        threadpool.awaitTermination(3, TimeUnit.SECONDS);
        log.debug("other...");

        // shutdownNow部分代码
        // log.debug("shutdownNow");
//        List<Runnable> runnables = threadpool.shutdownNow();
//        log.debug("other.... {}" , runnables);
    }
}

shutdown代码结果

18:35:02.128 c.TestShutDown [main] - shutdown
18:35:02.128 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:35:02.128 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 1 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-2] - task 2 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 3 running...
18:35:04.139 c.TestShutDown [pool-1-thread-1] - task 3 finish...
18:35:04.139 c.TestShutDown [main] - other...

解释
可以看出我们在shutdown后依然可以把 shutdown之前的 任务运行完毕,但是shutdown之后的任务就没有再运行了。另外我们关注一下awaitTermination方法 它的作用是等待shutdown部分的任务运行完后 主线程再运行awaitTermination方法之后的代码。

shutdownNow代码结果

18:47:52.344 c.TestShutDown [main] - shutdownNow
18:47:52.344 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:47:52.344 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:47:52.354 c.TestShutDown [main] - other.... [java.util.concurrent.FutureTask@e580929]

解释
可以看出我们在shutdownNow之后 只有一个任务运行成功了,也就是别的任务都已经被打断了。

目录
相关文章
|
21天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
100 38
|
7天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
19天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
44 2
|
21天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
58 4
|
21天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
89 2
|
23天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
16 2
|
23天前
|
Java
在Java多线程编程中,实现Runnable接口通常优于继承Thread类
【10月更文挑战第20天】在Java多线程编程中,实现Runnable接口通常优于继承Thread类。原因包括:1) Java只支持单继承,实现接口不受此限制;2) Runnable接口便于代码复用和线程池管理;3) 分离任务与线程,提高灵活性。因此,实现Runnable接口是更佳选择。
30 2
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
43 1
C++ 多线程之初识多线程
|
23天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
17 3
|
23天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
28 2