简写:
package com.vipsoft.Thread; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ThreadPoolExecutorTest { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2), new ThreadFactory() { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "Thread-" + mThreadNum.getAndIncrement()); System.out.println(t.getName() + " has been created"); return t; } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 可做日志记录等 System.err.println(r.toString() + " rejected " + executor.getTaskCount()); } }); executor.prestartAllCoreThreads(); // 预启动所有核心线程 for (int i = 1; i <= 10; i++) { final String taskName = String.valueOf(i); executor.execute(new Runnable() { @Override public void run() { try { System.out.println(this.toString() + " is running!"); Thread.sleep(3000); //让任务执行慢点 } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return "MyTask [name=" + taskName + "]"; } }); } //可读性好些 // Runnable call = new Runnable() { // @Override // public void run() { // try { // System.out.println(this.toString() + " is running!"); // Thread.sleep(3000); //让任务执行慢点 // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // @Override // public String toString() { // return "MyTask [name=" + taskName + "]"; // } // } // for (int i = 1; i <= 10; i++) { // final String taskName = String.valueOf(i); // executor.execute(call); // } System.in.read(); //阻塞主线程 } }
常规写法:
package com.vipsoft.Thread; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ThreadPoolExecutorTest { public static void main(String[] args) throws Exception { int corePoolSize = 2; int maximumPoolSize = 5; long keepAliveTime = 10; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3); //定义一个大小为2的队列,只等有一个任务在排队等,多出来的需要开新线程 ThreadFactory threadFactory = new MyTreadFactory(); RejectedExecutionHandler handler = new MyPolicy(); ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); System.out.println("预启动线程(备战)"); executor.prestartAllCoreThreads(); // 预启动所有核心线程,处于备战 System.out.println("预启动线程数(备战):" + executor.getPoolSize()); for (int i = 1; i <= 10; i++) { System.out.println(System.currentTimeMillis() + " " + "开始 下发任务:" + i + " 当前线程总数:" + executor.getPoolSize()); MyTask task = new MyTask(String.valueOf(i)); executor.execute(task); System.out.println(System.currentTimeMillis() + " " + "完成 下发任务:" + i + " 当前线程总数:" + executor.getPoolSize() + " 队列中的线程数量:" + workQueue.size()); Thread.sleep(1); //停1毫秒,日志记录,时间后方便分析 if (i == 9) { //TODO Thread.sleep(3000); //任务9下发后【会被拒绝】,停3秒,等队列或线程释放后,再下发任务10,这时候任务10不会被拒绝 } } System.in.read(); //阻塞主线程 } static class MyTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "Thread-" + mThreadNum.getAndIncrement()); System.out.println(System.currentTimeMillis() + " " + t.getName() + " has been created"); return t; } } public static class MyPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 可做日志记录等 System.err.println(System.currentTimeMillis() + " " + r.toString() + " rejected from " + e.toString()); } } static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { try { System.out.println(System.currentTimeMillis() + " " + this.toString() + " 开始运行! " + Thread.currentThread().getName()); Thread.sleep(3000); //让任务执行慢点 System.out.println(System.currentTimeMillis() + " " + this.toString() + " 运行结束! " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return "MyTask [name=" + this.name + "]"; } } }
运行结果:忽略图片内容
将日志按时间排序后解读:可容纳任务数 = 最大线程数 + 最大队列数
预启动线程(备战) 预启动线程数(备战):2 1654760985275 Thread-1 has been created //预启动创建线程 1654760985275 Thread-2 has been created //预启动创建线程 1654760985275 开始 下发任务:1 当前线程总数:2 //下发任务,当前有2线程(预启动创建) 1654760985276 MyTask [name=1] 开始运行! Thread-2 1654760985276 完成 下发任务:1 当前线程总数:2 //下发任务,当前有2线程(预启动创建) 1654760985278 MyTask [name=2] 开始运行! Thread-1 1654760985278 开始 下发任务:2 当前线程总数:2 1654760985278 完成 下发任务:2 当前线程总数:2 //下发任务,当前有2线程(预启动创建) 1654760985280 开始 下发任务:3 当前线程总数:2 1654760985280 完成 下发任务:3 当前线程总数:2 //下发任务,当前有2线程(预启动创建) 1654760985282 开始 下发任务:4 当前线程总数:2 1654760985282 完成 下发任务:4 当前线程总数:2 //下发任务,当前有2线程(预启动创建) 1654760985284 开始 下发任务:5 当前线程总数:2 1654760985284 完成 下发任务:5 当前线程总数:2 //下发任务,当前有2线程(预启动创建) 1654760985286 MyTask [name=6] 开始运行! Thread-3 1654760985286 Thread-3 has been created 1654760985286 开始 下发任务:6 当前线程总数:2 1654760985286 完成 下发任务:6 当前线程总数:3 //下发任务,当前有2线程,3个队列,第6个任务,已经不够放了,新建一个线程 1654760985288 MyTask [name=7] 开始运行! Thread-4 1654760985288 Thread-4 has been created 1654760985288 开始 下发任务:7 当前线程总数:3 1654760985288 完成 下发任务:7 当前线程总数:4 //下发任务,当前有3线程,3个队列,第8个任务,已经不够放了,新建一个线程 1654760985290 MyTask [name=8] 开始运行! Thread-5 1654760985290 Thread-5 has been created 1654760985290 开始 下发任务:8 当前线程总数:4 1654760985290 完成 下发任务:8 当前线程总数:5 //下发任务,当前有4线程,3个队列,第8个任务,已经不够放了,新建一个线程 1654760985292 MyTask [name=9] rejected from java.util.concurrent.ThreadPoolExecutor@4b1210ee[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0] 1654760985292 开始 下发任务:9 当前线程总数:5 1654760985292 完成 下发任务:9 当前线程总数:5 //下发任务,当前有5线程,3个队列,第9个任务,已经不够放了,新建一个线程(超过了最大线程数 maximumPoolSize + 队列数 workQueue )拒绝任务9 1654760985294 MyTask [name=10] rejected from java.util.concurrent.ThreadPoolExecutor@4b1210ee[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0] 1654760985294 开始 下发任务:10 当前线程总数:5 1654760985294 完成 下发任务:10 当前线程总数:5 //下发任务,当前有5线程,3个队列,第10个任务,已经不够放了,新建一个线程(超过了最大线程数+队列数)拒绝任务10 1654760988276 MyTask [name=1] 运行结束! Thread-2 //等有任务结束后,再去下发任务9、10,就不会抛弃任务了 1654760988276 MyTask [name=3] 开始运行! Thread-2 1654760988278 MyTask [name=2] 运行结束! Thread-1 1654760988278 MyTask [name=4] 开始运行! Thread-1 1654760988286 MyTask [name=5] 开始运行! Thread-3 1654760988286 MyTask [name=6] 运行结束! Thread-3 1654760988288 MyTask [name=7] 运行结束! Thread-4 1654760988291 MyTask [name=8] 运行结束! Thread-5 1654760991288 MyTask [name=3] 运行结束! Thread-2 1654760991288 MyTask [name=4] 运行结束! Thread-1 1654760991288 MyTask [name=5] 运行结束! Thread-3