🚀1. 自定义线程池
- 阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
- 主线程类似于生产者,产生任务并放入阻塞队列中
- 线程池类似于消费者,得到阻塞队列中已有的任务并执行
自定义线程池
/** * 自定义线程池 */ class ThreadPool { /** * 自定义阻塞队列 */ private BlockingQueue<Runnable> blockingQueue; /** * 核心线程数 */ private int coreSize; private HashSet<Worker> workers = new HashSet<>(); /** * 用于指定线程最大存活时间 */ private TimeUnit timeUnit; private long timeout; /** * 工作线程类 * 内部封装了Thread类,并且添加了一些属性 */ private class Worker extends Thread { Runnable task; public Worker(Runnable task) { System.out.println("初始化任务"); this.task = task; } @Override public void run() { // 如果有任务就执行 // 如果阻塞队列中有任务,就继续执行 while (task != null || (task = blockingQueue.take()) != null) { try { System.out.println("执行任务"); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { // 任务执行完毕,设为空 System.out.println("任务执行完毕"); task = null; } } // 移除任务 synchronized (workers) { System.out.println("移除任务"); workers.remove(this); } } } public ThreadPool(int coreSize, TimeUnit timeUnit, long timeout, int capacity) { this.coreSize = coreSize; this.timeUnit = timeUnit; blockingQueue = new BlockingQueue<>(capacity); this.timeout = timeout; } public void execute(Runnable task) { synchronized (workers) { // 创建任务 // 池中还有空余线程时,可以运行任务 // 否则阻塞 if (workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { System.out.println("线程池中线程已用完,请稍等"); blockingQueue.put(task); } } } }
自定义阻塞队列
/** * 阻塞队列 * 用于存放主线程或其他线程产生的任务 */ class BlockingQueue<T> { /** * 阻塞队列 */ private Deque<T> blockingQueue; /** * 阻塞队列容量 */ private int capacity; /** * 锁 */ private ReentrantLock lock; /** * 条件队列 */ private Condition fullQueue; private Condition emptyQueue; public BlockingQueue(int capacity) { blockingQueue = new ArrayDeque<>(capacity); lock = new ReentrantLock(); fullQueue = lock.newCondition(); emptyQueue = lock.newCondition(); this.capacity = capacity; } /** * 获取任务的方法 */ public T take() { // 加锁 lock.lock(); try { // 如果阻塞队列为空(没有任务),就一直等待 while (blockingQueue.isEmpty()) { try { emptyQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 获取任务并唤醒生产者线程 T task = blockingQueue.removeFirst(); fullQueue.signalAll(); return task; } finally { lock.unlock(); } } public T takeNanos(long timeout, TimeUnit unit) { // 转换等待时间 lock.lock(); try { long nanos = unit.toNanos(timeout); while (blockingQueue.isEmpty()) { try { // awaitNanos会返回剩下的等待时间 nanos = emptyQueue.awaitNanos(nanos); if (nanos < 0) { return null; } } catch (InterruptedException e) { e.printStackTrace(); } } T task = blockingQueue.removeFirst(); fullQueue.signalAll(); return task; } finally { lock.unlock(); } } /** * 放入任务的方法 * @param task 放入阻塞队列的任务 */ public void put(T task) { lock.lock(); try { while (blockingQueue.size() == capacity) { try { System.out.println("阻塞队列已满"); fullQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } blockingQueue.add(task); // 唤醒等待的消费者 emptyQueue.signalAll(); } finally { lock.unlock(); } } public int getSize() { lock.lock(); try { return blockingQueue.size(); } finally { lock.unlock(); } } }
调用
public class Test { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2, TimeUnit.SECONDS, 1, 4); for (int i = 0; i < 10; i++) { threadPool.execute(()->{ try { TimeUnit.SECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务正在执行!"); }); } } }
🚀2. ThreadPoolExecutor
ThreadPoolExecutor 的继承关系图如下图所示
🚁2.1 线程池状态
✨ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
// 线程池状态 // runState is stored in the high-order bits // RUNNING 高3位为111 private static final int RUNNING = -1 << COUNT_BITS; // SHUTDOWN 高3位为000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 高3位 001 private static final int STOP = 1 << COUNT_BITS; // 高3位 010 private static final int TIDYING = 2 << COUNT_BITS; // 高3位 011 private static final int TERMINATED = 3 << COUNT_BITS;
状态名称 | 高3位的值 | 描述 |
RUNNING | 111 | 接收新任务,同时处理任务队列中的任务 |
SHUTDOWN | 000 | 不接受新任务,但是处理任务队列中的任务 |
STOP | 001 | 中断正在执行的任务,同时抛弃阻塞队列中的任务 |
TIDYING | 010 | 任务执行完毕,活动线程为0时,即将进入终结阶段 |
TERMINATED | 011 | 终结状态 |
✨线程池状态和线程池中线程的数量由一个原子整型变量 ctl 保存,可以通过一次 CAS 同时更改两个属性的值。
// 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 并不是所有平台的int都是32位。 // 去掉前三位保存线程状态的位数,剩下的用于保存线程数量 // 高3位为0,剩余位数全为1 private static final int COUNT_BITS = Integer.SIZE - 3; // 2^COUNT_BITS次方,表示可以保存的最大线程数 // CAPACITY 的高3位为 0 private static final int CAPACITY = (1 << COUNT_BITS) - 1
获取线程池状态、线程数量以及合并两个值的操作
// Packing and unpacking ctl // 获取运行状态 // 该操作会让除高3位以外的数全部变为0 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取运行线程数 // 该操作会让高3位为0 private static int workerCountOf(int c) { return c & CAPACITY; } // 计算ctl新值 private static int ctlOf(int rs, int wc) { return rs | wc; }
线程属性
// 工作线程,内部封装了Thread private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... } // 阻塞队列,用于存放来不及被核心线程执行的任务 private final BlockingQueue<Runnable> workQueue; // 锁 private final ReentrantLock mainLock = new ReentrantLock(); // 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程) private final HashSet<Worker> workers = new HashSet<Worker>();
🚁2.2 构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
✨参数解释
corePoolSize:核心线程数
maxiumPoolSize:最大线程数,maximumPoolSize - corePoolSize = 救急线程数
keepAliveTime:救急线程空闲时的最大生存时间
unit:时间单位(针对救急线程)
workQueue:阻塞队列(存放任务)
有界阻塞队列 ArrayBlockingQueue
无界阻塞队列 LinkedBlockingQueue
最多只有一个同步元素的 SynchronousQueue
优先队列 PriorityBlockingQueue
threadFactory:线程工厂(可以为线程创建时起名字)
handler:拒绝策略
工作方式
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排队,直到有空闲的线程
如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maxiumPoolSize-corePoolSize 数目的线程来救急
如果线程达到 maxiumPoolSize 仍然有新任务这时会执行拒绝策略,
当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束来节省资源,这个时间由 keepAliveTime 和 unit 来控制
对于拒绝策略 JDK 提供了 4 种实现:
AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者执行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
一些著名框架,也提供了具体的实现:
Dubbo 的实现,在抛出 RejectedExecution 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
Netty 的实现,是创建一个新线程来执行任务
ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
使用
public class Test { static AtomicInteger threadId = new AtomicInteger(0); public static void main(String[] args) { //手动创建线程池 //创建有界阻塞队列 ArrayBlockingQueue<Runnable> runnable = new ArrayBlockingQueue<>(10); //创建线程工厂 ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "working_thread"+threadId.getAndIncrement()); return thread; } }; //手动创建线程池 //拒绝策略采用默认策略 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 7, 10, TimeUnit.SECONDS, runnable, threadFactory); for (int i = 0; i < 20; i++) { executor.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread()); try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
🚁2.3 newFiexedThreadPool
内部调用的构造方法
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
特点:
- 核心线程数=最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
适用于任务量已知,相对耗时的任务。
使用如下:
public class TestFixedThreadPool { public static void main(String[] args) { ThreadFactory factory = new ThreadFactory() { AtomicInteger atomicInteger = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "myThread_"+atomicInteger.getAndIncrement()); } }; //创建核心线程数量为2的线程池 //通过 ThreadFactory 可以给线程添加名字 ExecutorService executorService = Executors.newFixedThreadPool(2,factory); Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); System.out.println("this is fixedThreadPool"); } }; executorService.execute(runnable); } }
🚁2.4 newCachedThreadPool
内部调用的构造方法
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); ) }
特点:
核心线程数是 0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着,全部都是救急线程(60s 后可以回收),救急线程可以无限创建。
阻塞队列采用了 SynchronousQueue,实现特点是,它没有容量,没有线程来取是放不进去的,只有当线程取任务时,才会将任务放入该阻塞队列中。
🚁2.5 newSingleThreadExecutor
内部构造方法
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1,1,0L, TimeUnit.MILLSECONDS, new LinkedBlockingQueue<Runnable>())); }
✨内部调用了 new ThreadPoolExecutor 的构造方法,传入的 corePoolSize 和 maximumPoolSize 都为1。然后将该对象传给了 FinalizableDelegatedExecutorService。该类修饰了 ThreadPoolExecutor,让外部无法调用 ThreadPoolExecutor 内部的某些方法来修改所创建的线程池的大小。
注意点:
SingleThread 和自己创建一个线程来运行多个任务的区别:
当线程正在执行的任务发生错误时,如果是自己创建的线程,该任务和剩余的任务就无法再继续运行下去。而 SingleThread 会创建一个新线程,继续执行任务队列中剩余的任务
SingleThread 和 newFixedThreadPool(1) 的区别:
newFixedThreadPool(1) 传值为 1,可以将 FixedThreadPool 强转为 ThreadPoolExecutor,然后通过 setCorePoolSize 改变核心线程数,而 SingleThread 无法修改核心线程数
// 强转为ThreadPoolExecutor ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); // 改变核心线程数 threadPool.setCorePoolSize(2);
🚀3. 提交任务
execute() 方法,传入一个 Runnable 对象,执行其中的 run 方法
//执行任务 void execute(Runnable command);
submit() 方法,传入一个 Callable 对象,用 Future 来捕获返回值
使用
// 通过submit执行Callable中的call方法 // 通过Future来捕获返回值 Future<String> future = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { return "hello submit"; } }); // 查看捕获的返回值 System.out.println(future.get());
🚀4. 关闭线程池
shutdown()
/* 线程池状态变为 SHUTDOWN -不会接收新任务 -但已提交任务会执行完 -此方法不会阻塞调用线程的执行 */ void shutdown();
public void shutdown() { final ReentrantLock mainlock = this.mainLock; mainlock.lock(); try { checkShutdownAccess(); //修改线程池状态 advanceRunState(SHUTDOWN); //仅会打断空闲线程 interruptIdleWorkers(); onShutdown();//扩展点 ScheduleThreadPoolExecutor } finally { mainlock.unlock(); } //尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); }
shutdownNow
/* 线程池状态变为 STOP -不会接收新任务 -会将队列中的任务返回 -并用 interrupt 的方式中断正在执行的任务 */ List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock(); mainLock.lock(); try { checkShutdownAccess(); //修改线程池状态 advanceRunState(STOP); //打断所有线程 interruptWorkers(); //获取队列中剩余任务 tasks = drainQueue(); } finally { mainLock.unlock(); } //尝试终结 tryTerminate(); return tasks; }
其他方法
//不在 RUNNING 状态的线程池,此方法返回 true boolean isShutdown(); //线程池状态是否是 TERMINATED boolean isTerminated(); //调用shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMIANTED后做这些事情,可以利用此方法等待 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
🚀5. 任务调度线程池
✨在【任务调度线程池】功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
public static void main(String[] args) { Timer timer = new Timer(); TimerTask task1 = new TimerTask() { @Override public void run() { System.out.println("task 1"); sleep(2); } }; TimerTask task2 = new TimerTask() { @Override public void run() { System.out.println("task 2"); } }; //使用timer添加两个任务,希望它们都在1s后执行 //但由于timer内只有一个线程来顺序执行队列中的任务,因此【任务1】的延时,影响了【任务2】的执行 timer.schedule(task1, 1000); timer.schedule(task2, 1000); }
使用 ScheduleExecutorService 改写
ScheduleExecutorService executor = Executors.newScheduledThreadPool(2); //添加两个任务,希望它们都在1s后执行 executor.schedule(()->{ System.out.println("任务1,执行时间"+new Date()); try { Thread.sleep(2000); } catch(InterruptedException e) { } }, 1000, TimeUnit.MILLISECONDS); executor.schedule(()->{ System.out.println("任务2,执行时间:"+new Date()); }, 1000, TimeUnit.MILLISECONDS);