【多线程:线程池】ThreadPoolExecutor类-提交、停止
01.提交任务
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
submit方法
submit方法与execute方法的区别在于,execute方法接收的参数是Runnable类型的参数 没有返回值,而submit方法接收的是Callable类型的参数有返回值 且 返回值用Future<>接收,Future<>获取线程的返回值 实现原理 就是之前所讲的保护性暂停模式。
例子
@Slf4j(topic = "c.TestPool1")
public class TestPool1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
Future<String> future = threadpool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("running");
Thread.sleep(1000);
return "ok";
}
});
log.debug("{}",future.get());
}
}
结果
17:12:30.718 c.TestPool1 [pool-1-thread-1] - running
17:12:31.731 c.TestPool1 [main] - ok
解释
可以看出我们实现了Callable接口 并返回了字符串"ok" 给Future< String>,之后通过get方法获取到返回数据。
invokeAll方法
invokeAll方法接收的是一个任务集合 且有返回值,线程池中的线程执行这个任务集合。
例子
@Slf4j(topic = "c.TestPool2")
public class TestPool2 {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
() -> {
log.debug("begin1");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin2");
Thread.sleep(500);
return "2";
},
() -> {
log.debug("begin3");
Thread.sleep(2000);
return "3";
}
));
futures.forEach(f -> {
try {
log.debug("{}",f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
结果
17:48:49.429 c.TestPool2 [pool-1-thread-2] - begin2
17:48:49.429 c.TestPool2 [pool-1-thread-1] - begin1
17:48:49.930 c.TestPool2 [pool-1-thread-2] - begin3
17:48:51.937 c.TestPool2 [main] - 1
17:48:51.937 c.TestPool2 [main] - 2
17:48:51.937 c.TestPool2 [main] - 3
解释
我们可以看到 任务1与任务2同时被执行,但是因为我们的线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后 线程2执行任务3,中间差了0.5s 是因为 任务2执行了0.5s,最终我们遍历返回结果 并打印。
invokeAny方法
invokeAny方法的作用是得到一个最先返回的结果,之后的线程就不再运行。
例子
@Slf4j(topic = "c.TestPool3")
public class TestPool3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
String future = threadpool.invokeAny(Arrays.asList(
() -> {
log.debug("begin1");
Thread.sleep(1000);
log.debug("end1");
return "1";
},
() -> {
log.debug("begin2");
Thread.sleep(500);
log.debug("end2");
return "2";
},
() -> {
log.debug("begin3");
Thread.sleep(2000);
log.debug("end3");
return "3";
}
));
log.debug("{}",future);
}
}
结果
18:03:18.892 c.TestPool3 [pool-1-thread-3] - begin3
18:03:18.892 c.TestPool3 [pool-1-thread-2] - begin2
18:03:18.892 c.TestPool3 [pool-1-thread-1] - begin1
18:03:19.412 c.TestPool3 [pool-1-thread-2] - end2
18:03:19.412 c.TestPool3 [main] - 2
解释
注意我们此时把核心线程数 设置为3,此时三个任务同时运行,且任务2先运行完 可以看到最后的结果只有任务2的返回值,但是注意一个场景,假如我们现在的核心线程数只有一个 那么很明显我们的线程会先运行任务1 剩下两个任务加入任务队列,等待任务1运行完才会 有空闲线程运行剩下的任务,也就是说任务1是先运行完的 所以返回的结果只有任务1的返回值。
02.关闭线程池
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
// 源码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
// 源码
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}
// 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
shutdown与shutdownNow例子
@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
Future<Integer> result1 = threadpool.submit(() -> {
log.debug("task 1 running...");
Thread.sleep(1000);
log.debug("task 1 finish...");
return 1;
});
Future<Integer> result2 = threadpool.submit(() -> {
log.debug("task 2 running...");
Thread.sleep(1000);
log.debug("task 2 finish...");
return 2;
});
Future<Integer> result3 = threadpool.submit(() -> {
log.debug("task 3 running...");
Thread.sleep(1000);
log.debug("task 3 finish...");
return 3;
});
// shutdown() 部分的代码
log.debug("shutdown");
threadpool.shutdown();
threadpool.submit(()->{
log.debug("task 4 running...");
Thread.sleep(1000);
log.debug("task 4 finish");
return "4";
});
threadpool.awaitTermination(3, TimeUnit.SECONDS);
log.debug("other...");
// shutdownNow部分代码
// log.debug("shutdownNow");
// List<Runnable> runnables = threadpool.shutdownNow();
// log.debug("other.... {}" , runnables);
}
}
shutdown代码结果
18:35:02.128 c.TestShutDown [main] - shutdown
18:35:02.128 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:35:02.128 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 1 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-2] - task 2 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 3 running...
18:35:04.139 c.TestShutDown [pool-1-thread-1] - task 3 finish...
18:35:04.139 c.TestShutDown [main] - other...
解释
可以看出我们在shutdown后依然可以把 shutdown之前的 任务运行完毕,但是shutdown之后的任务就没有再运行了。另外我们关注一下awaitTermination方法 它的作用是等待shutdown部分的任务运行完后 主线程再运行awaitTermination方法之后的代码。
shutdownNow代码结果
18:47:52.344 c.TestShutDown [main] - shutdownNow
18:47:52.344 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:47:52.344 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:47:52.354 c.TestShutDown [main] - other.... [java.util.concurrent.FutureTask@e580929]
解释
可以看出我们在shutdownNow之后 只有一个任务运行成功了,也就是别的任务都已经被打断了。