线程池
线程池基本概念
线程池
线程池本质上是一种对象池,用于管理线程资源。在任务执行前,需要从线程池中拿出线程来执行。在任务执行完成之后,把线程放回线程池。实际开发中,线程资源一般通过线程池提供,比如处理数据库连接、接收网络请求。
- 线程的创建更加规范,可以合理控制开辟线程的数量。
- 不必频繁地创建和销毁线程,优化了资源的开销。
核心线程池
(corePool) 通常状况下,线程池最多能创建的线程数。
当有新任务等待处理时,线程池会首先判断核心线程池是否已满,如果没满则创建线程执行任务。即使有其他核心线程空闲也会创建新的核心线程来执行。
任务队列
(BlockQueue) 线程池中等待被线程执行的任务队列。
如果核心线程池已满,线程池会判断队列是否已满。如果队列没满,就会将任务放在队列中等待执行。
- ArrayBlockingQueue // 基于数组实现的阻塞队列,有界。
- LinkedBlockingQueue // 基于链表实现的阻塞队列,可以无界。
- SynchronousQueue // 不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作。
- PriorityBlockingQueue // 带优先级的阻塞队列,无界。
最大线程池
(maximumPool) 任务量很大时,线程池最多能创建的线程数。
如果队列已满,说明当前任务量已经非常大,仅靠核心线程池内的线程数量已无法处理。线程池会判断最大线程池是否已满,如果没满则创建更多线程,从等待队列首部取得任务并执行。
拒绝策略
(RejectedExecutionHandler) 线程池拒绝过量任务的方式。
如果最大线程池已满,表示当前服务器已无法处理这么多任务。任务会按照既定的拒绝策略被处理。
- CallerRunsPolicy // 在调用者线程执行。
- AbortPolicy // 直接抛出 RejectedExecutionException 异常。
- DiscardPolicy // (常用)任务直接丢弃,不做任何处理。
- DiscardOldestPolicy // 丢弃队列里最旧的那个任务,再尝试执行当前任务。
ThreadPoolExecutor 类
实现了 ExecutorService 接口,是 java 开发常用的线程池类。位于 java.util.concurrent 包内,使用时需要进行导入。
创建线程池
- ThreadPoolExecutor 类在创建线程池时需要输入以下参数:
int corePoolSize = 2; // 核心线程池大小 int maximumPoolSize = 4; // 最大线程池大小 long keepAliveTime = 10; // 空闲线程多久被销毁,0 表示永远不会 TimeUnit unit = TimeUnit.SECONDS; // keepAliveTime 的单位 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); // 任务队列 ThreadFactory threadFactory = new NameTreadFactory(); // 线程工厂接口,一般默认。 RejectedExecutionHandler handler = new MyIgnorePolicy(); // 拒绝策略,一般默认。 ThreadPoolExecutor service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);Copy to clipboardErrorCopied 复制代码
- ThreadPoolExecutor 类还可以重写以下方法(默认为空实现):
ExecutorService service = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)) { // 任务执行前被调用 @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("beforeExecute is called"); } // 任务执行后被调用 @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("afterExecute is called"); } // 线程池结束后被调用 @Override protected void terminated() { System.out.println("terminated is called"); } };Copy to clipboardErrorCopied 复制代码
获取线程池信息
service.getTaskCount(); // 获取已经执行或正在执行的任务数 service.getCompletedTaskCount(); // 获取已经执行的任务数 service.getLargestPoolSize(); // 获取线程池曾经创建过的最大线程数 service.getPoolSize(); // 获取线程池线程数 service.getActiveCount(); // 获取活跃线程数(正在执行任务的线程数)Copy to clipboardErrorCopied 复制代码
提交任务
可以向线程池提交的任务有两种:Runnable 接口和 Callable 接口。
Runnable 接口
内部定义了 run 方法,没有返回值,不允许抛出异常。通过 execute 方法向线程池提交。
service.execute(new Runnable(){ System.out.println("new thread"); });Copy to clipboardErrorCopied 复制代码
Callable 接口
内部定义了 call 方法,允许有返回值,允许抛出异常。通过 submit 方法向线程池提交,返回一个 Future 对象。
可以通过调用 Future 对象的 get 方法获得数据,在返回结果前 get 方法会阻塞。
Future<Integer> f = service.submit(new Callable(){ System.out.println("new thread"); return 1; }); System.out.println(f.get());Copy to clipboardErrorCopied 复制代码
关闭线程池
service.shutdown(); // 线程池不再接受新的任务,线程池中已有任务执行完成后终止。 service.shutdownNow(); // 线程池不再接受新的任务并对所有线程执行 interrupt 操作,清空队列并终止。 boolean b = service.isShutdown(); // 返回线程池是否关闭:不再接受新任务。 boolean b = service.isTerminated(); // 返回线程池是否终止Copy to clipboardErrorCopied 复制代码
ThreadPoolExecutor 类示例
public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //自定义线程工厂 pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new ThreadFactory() { public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); //线程命名 Thread th = new Thread(r,"threadPool"+r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy()); for(int i=0;i<10;i++) { pool.execute(new ThreadTask()); } } } public class ThreadTask implements Runnable{ public void run() { //输出执行线程的名称 System.out.println("ThreadName:"+Thread.currentThread().getName()); } }Copy to clipboardErrorCopied 复制代码
Executors 类(不常用)
继承 ThreadPoolExecutor 类的线程池工厂类:提供 4 种工厂方法创建线程池。但该方法既不灵活也不安全,实际开发中很少使用。
// 单个线程的线程池 ExecutorService service = Executors.newSingleThreadExecutor(); // 指定数量的线程池 ExecutorService service = Executors.newFixedThreadExecutor(10); // 大小不限的线程池,60s 不使用会自动回收空闲线程。 ExecutorService service = Executors.newCacheThreadExecutor(); // 大小不限的线程池,可定时执行任务。 ExecutorService service = Executors.newScheduleThreadExecutor();Copy to clipboardErrorCopied 复制代码
Executors 类示例
public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { executor.submit(() -> { System.out.println("thread id is: " + Thread.currentThread().getId()); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }