概念
线程池(Thread Pool)是Java中用于管理线程的一种技术,它可以有效地提高程序的并发性能和资源利用率。
它是一种多线程处理形式,线程池预先创建并维护一组线程,用于处理任务队列中的任务。线程池中的线程可以被重复利用,当有新任务到达时,线程池会分配一个线程来执行任务,而不是每次都创建新的线程。线程池中的线程数量通常是有限的,这可以避免因为创建过多线程而导致系统资源耗尽。
线程池可以看作是一个管理线程的“总调度官”,它负责线程的存储、调度和执行。线程池内部通常包含核心线程和非核心线程,核心线程是长期存在的,而非核心线程则根据任务的数量动态创建和销毁。
线程池的主要优点
降低资源消耗
线程的创建和销毁需要消耗大量的系统资源。通过线程池,我们可以复用已经创建的线程,从而避免频繁地创建和销毁线程。
提高响应速度
当任务到达时,线程池可以立即分配一个线程来处理任务,从而提高了系统的响应速度。
提高系统吞吐量
通过合理地配置线程池的大小,我们可以充分利用系统资源,从而提高系统的吞吐量。
Java中线程池的实现
FixedThreadPool
特点
创建一个固定大小的线程池,当有新任务提交时,如果线程池中有空闲线程,则立即执行;如果线程池已满,则任务会在队列中等待,直到有空闲线程可用。
线程池的大小在创建时指定,之后不能更改。
工作队列是一个无界队列,可以容纳任意数量的待执行任务。
这是很常用的一个线程池,因为它的容量由程序员决定,所以管理起来也比价容易。
适用场景
适用于已知并发任务数量,且任务数量不会变化或变化不大的场景。
当需要减少在创建和销毁线程上花费的时间以及系统开销时,可以考虑使用FixedThreadPool。
注意事项
由于工作队列是无界的,如果任务提交速度持续大于处理速度,可能会导致内存耗尽。
使用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
int threadSize = 10000; // 线程数量大小
long during = 0; // 耗费时间
long start = System.currentTimeMillis();
// 创建一个固定大小为3的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交5个任务到线程池
for (int i = 0; i < threadSize; i++) {
int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("执行任务 " + taskId + ",由线程 " + Thread.currentThread().getName() + " 执行");
});
}
// 关闭线程池(这会等待已提交的任务执行完毕)
executor.shutdown();
while (!executor.isTerminated()) {
// 等待所有任务执行完毕
}
System.out.println("所有任务执行完毕");
long end = System.currentTimeMillis();
during = end - start; // 记录线程池的时间
start = System.currentTimeMillis();
for (int i = 0; i < threadSize; i++) {
int taskId = i;
Thread thread = new Thread(() -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("执行任务 " + taskId + ",由线程 " + Thread.currentThread().getName() + " 执行");
});
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("所有任务执行完毕");
end = System.currentTimeMillis();
System.out.println("线程池耗时" + (during) + "ms");
System.out.println("线程耗时" + (end - start) + "ms");
System.out.println("线程池相比线程更" + ((end - start) > during ? "快" : "慢"));
}
}
最后的结果输出如下
所有任务执行完毕
线程池耗时6172ms
线程耗时21764ms
线程池相比线程更快
如果你把线程池的线程数量调成9,则结果如下
所有任务执行完毕
线程池耗时2002ms
线程耗时21542ms
线程池相比线程更快
CachedThreadPool
特点
创建一个可缓存的线程池,如果线程池中有空闲线程,则立即执行;如果没有空闲线程,则创建一个新线程来执行任务。
线程池的大小会根据需要动态调整。
工作队列是一个SynchronousQueue,它实际上不存储元素,每个插入操作必须等待一个相应的删除操作。
适用场景
适用于执行大量短时间异步任务的程序。
当任务数量不确定,且任务执行时间较短时,可以考虑使用CachedThreadPool。
注意事项
由于线程池的大小可以无限制增长,如果任务执行时间很长,或者任务数量持续很大,可能会导致系统资源耗尽。
使用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
Thread.sleep(20000); // 方便jconsole查看此进程
int threadSize = 10000; // 线程数量大小
long during = 0; // 耗费时间
long start = System.currentTimeMillis();
// 创建一个固定大小为3的线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 提交5个任务到线程池
for (int i = 0; i < threadSize; i++) {
int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("执行任务 " + taskId + ",由线程 " + Thread.currentThread().getName() + " 执行");
});
}
// 关闭线程池(这会等待已提交的任务执行完毕)
executor.shutdown();
while (!executor.isTerminated()) {
// 等待所有任务执行完毕
}
System.out.println("所有任务执行完毕");
long end = System.currentTimeMillis();
during = end - start; // 记录线程池的时间
Thread.sleep(20000);
start = System.currentTimeMillis();
for (int i = 0; i < threadSize; i++) {
int taskId = i;
Thread thread = new Thread(() -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("执行任务 " + taskId + ",由线程 " + Thread.currentThread().getName() + " 执行");
});
thread.start();
thread.join();
}
System.out.println("所有任务执行完毕");
end = System.currentTimeMillis();
System.out.println("线程池耗时" + (during) + "ms");
System.out.println("线程耗时" + (end - start) + "ms");
System.out.println("线程池相比线程更" + ((end - start) > during ? "快" : "慢"));
}
}
控制台末尾的输出如下
所有任务执行完毕
线程池耗时468ms
线程耗时59495ms
线程池相比线程更快
jconsole的性能监控如下
ScheduledThreadPool
特点
创建一个可以执行定时或周期性任务的线程池。
线程池的大小在创建时指定,之后不能更改。
工作队列是一个DelayedWorkQueue,它用于存放待执行的任务,并根据任务的延迟时间进行排序。
适用场景
适用于需要定时执行或周期性执行任务的场景。
当需要按照特定时间间隔执行任务,或者需要在特定时间执行任务时,可以考虑使用ScheduledThreadPool。
注意事项
由于线程池的大小是固定的,如果任务数量很大,可能会因为线程资源不足而导致任务延迟执行。
使用
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
// 创建一个定时线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
// 在延迟2秒后执行任务,并每隔1秒重复执行
executor.scheduleAtFixedRate(() -> {
System.out.println("执行定时任务,由线程 " + Thread.currentThread().getName() + " 执行");
}, 2, 1, TimeUnit.SECONDS);
// 让主线程等待足够长的时间,以便观察定时任务的执行
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭线程池
executor.shutdown();
}
}
console输出
执行定时任务,由线程 pool-1-thread-1 执行
执行定时任务,由线程 pool-1-thread-1 执行
执行定时任务,由线程 pool-1-thread-2 执行
执行定时任务,由线程 pool-1-thread-2 执行
执行定时任务,由线程 pool-1-thread-2 执行
SingleThreadExecutor
特点
创建一个只有一个线程的线程池,所有任务都会按照提交的顺序依次执行。
保证了任务的执行顺序,即任务的提交顺序和执行顺序是一致的。
工作队列是一个LinkedBlockingQueue,用于存放待执行的任务。
适用场景
适用于需要保证任务按照提交顺序执行的场景。
当任务之间没有依赖关系,但需要保证任务的执行顺序时,可以考虑使用SingleThreadExecutor。
注意事项
由于只有一个线程,如果任务执行时间很长,可能会导致后续任务等待时间过长。
使用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
// 创建一个单线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交5个任务到线程池
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("执行任务 " + taskId + ",由线程 " + Thread.currentThread().getName() + " 执行");
});
}
// 关闭线程池
executor.shutdown();
while (!executor.isTerminated()) {
// 等待所有任务执行完毕
}
System.out.println("所有任务执行完毕");
}
}
console输出
执行任务 0,由线程 pool-1-thread-1 执行
执行任务 1,由线程 pool-1-thread-1 执行
执行任务 2,由线程 pool-1-thread-1 执行
执行任务 3,由线程 pool-1-thread-1 执行
执行任务 4,由线程 pool-1-thread-1 执行
所有任务执行完毕
语法总结
提交任务
Future<?> submit(Runnable task): 提交一个Runnable任务进行执行,并返回一个表示异步计算结果的Future对象。
Future submit(Callable task): 提交一个Callable任务进行执行,并返回一个表示异步计算结果的Future对象。Callable可以返回一个结果。
Future<?> submit(Runnable task, T result): 提交一个Runnable任务进行执行,并返回一个表示异步计算结果的Future对象,该对象在任务完成时返回给定的结果。
关闭线程池
void shutdown(): 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他效果。
List shutdownNow(): 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
检查线程池状态
boolean isShutdown(): 如果执行器已关闭,则返回true。
boolean isTerminated(): 如果执行器已关闭,并且所有任务都已完成,则返回true。
boolean awaitTermination(long timeout, TimeUnit unit): 如果在给定的超时时间内,执行器终止,则返回true。
import java.util.concurrent.*;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 提交任务到线程池
Future<String> future1 = executorService.submit(() -> {
return "Task 1 result returned successfully";
});
Future<Integer> future2 = executorService.submit(() -> {
// 模拟耗时任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 42; // 返回计算结果
});
// 提交10个任务到线程池
for (int i = 0; i < 10; i++) {
final int taskId = i;
executorService.submit(() -> {
System.out.println("Task " + taskId + " is running by thread " + Thread.currentThread().getName());
try {
// 模拟任务执行时间
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executorService.shutdown();
System.out.println("线程池关闭----");
// 等待任务完成并获取结果
try {
System.out.println("Task 1 result: future1 -> " + future1.get());
System.out.println("Task 2 result: future2 -> " + future2.get());
} catch (Exception e) {
e.printStackTrace();
}
// 关闭线程池并开始等待其终止
try {
// 等待10秒,看线程池是否能在10秒内完成所有任务并终止
if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("All tasks have completed execution and the executor service has terminated.");
} else {
System.out.println("The executor service did not terminate within 10 seconds.");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}