自定义线程池
线程是一个系统资源,没创建一个新的线程就会占用一定的内存,会用栈内存,如果是高并发情况下,一下子来了很多任务,如果我为每个认为都创建一个新的线程,对内存的占用是非常大的,甚至可能会出现内存泄漏问题。
自定义线程池其实包含几个组件:
生产者,消费者,阻塞队列,两边的速率由阻塞队列等待。
线程也并不是创建的越多越好,大量的任务来了,创建了很多的线程,cpu一共就几个核,一下子来了那么多的线程,cpu忙不过来,必然让获取不到cpu时间片的线程,陷入阻塞,会引起线程上下文切换,把当前线程的状态保存下来,下次轮到的时候再取出来并恢复。线程上下文切换频繁,对系统性能应该很大。
基于上述两个原因,并不是每次都要创建新的线程,而是基于已有线程的潜力,去处理任务,而不是每次都创建新的。这也是享元模式的思想。
其实从上图就可以分析出来了,简易版本的线程池 有 生产者、消费者、阻塞队列。
其中我们需要实现的部分是 消费者 和 阻塞队列。
任务队列
接下来会在代码中 详解思路
class BlockingQueue<T> { // 首先定义最基本的,也就是任务队列、锁(对队列进行操作时需要用锁去控制)、生产者消费者变量(实际上之所以使用Lock 而不是 Synchronized的优势就是 wait 和 notify释放的方式是随机的,而 await signal是指定的,且Synchronized 只有一个waitset,而Lock可以指定多个条件变量,代码整洁度更加的明显) // 1. 任务队列 private Deque<T> queue = new ArrayDeque<>(); // 2. 锁 private ReentrantLock lock = new ReentrantLock(); // 3. 生产者条件变量 private Condition fullWaitSet = lock.newCondition(); // 4. 消费者条件变量 private Condition emptyWaitSet = lock.newCondition(); // 5. 容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } // 阻塞分为 不带超时时间 和 带超时时间版本,其实就是当我们获取时 是一直阻塞等待还是超时结束,这里面分别调用不同的API,比如 await 和 awaitNanos // 带超时阻塞获取 public T poll(long timeout, TimeUnit unit) { lock.lock(); try { // 将 timeout 统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { // 返回值是剩余时间 if (nanos <= 0) { return null; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 阻塞获取 public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 与阻塞获取同理 // 阻塞添加 public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { System.out.println("等待加入任务队列 "+ task); fullWaitSet.await(); /** 这里面其实是有一个点需要注意的,就是 其实 await 这次阻塞的是主线程,本质上,await还是会阻塞 并释放锁的。 */ } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("加入任务队列 "+ task); queue.addLast(task); emptyWaitSet.signal(); } finally { lock.unlock(); } } // 带超时时间阻塞添加 public boolean offer(T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capcity) { try { if(nanos <= 0) { return false; } System.out.println("等待加入任务队列 "+ task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("加入任务队列 "+ task); queue.addLast(task); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
线程池
class ThreadPool { // 目前该版本的是不带拒绝策略的线程池,所以需要的参数有 核心线程数、任务队列、超时时间、单位 // 任务队列 private BlockingQueue<Runnable> taskQueue; // 线程集合 private HashSet<Worker> workers = new HashSet<>(); // 核心线程数 private int coreSize; // 获取任务时的超时时间 private long timeout; private TimeUnit timeUnit; // 执行任务 public void execute(Runnable task) { // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行 // 如果任务数超过 coreSize 时,加入任务队列暂存 synchronized (workers) { if(workers.size() < coreSize) { Worker worker = new Worker(task); System.out.println("新增 worker "+ worker + " " + task); workers.add(worker); worker.start(); } else { taskQueue.put(task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1) 当 task 不为空,执行任务 // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行 // while(task != null || (task = taskQueue.take()) != null) { while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { System.out.println("正在执行 "+task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { System.out.println("worker 被移除"+ this); workers.remove(this); } } } }
多维度测试
阻塞版
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10); for (int i = 0; i < 5; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); }); }
输出:
新增 worker Thread[Thread-0,5,main] test2$$Lambda$1/1831932724@448139f0 新增 worker Thread[Thread-1,5,main] test2$$Lambda$1/1831932724@7ba4f24f 加入任务队列 test2$$Lambda$1/1831932724@7699a589 正在执行 test2$$Lambda$1/1831932724@448139f0 正在执行 test2$$Lambda$1/1831932724@7ba4f24f 加入任务队列 test2$$Lambda$1/1831932724@58372a00 加入任务队列 test2$$Lambda$1/1831932724@4dd8dc3 1 0 正在执行 test2$$Lambda$1/1831932724@7699a589 正在执行 test2$$Lambda$1/1831932724@58372a00 3 2 正在执行 test2$$Lambda$1/1831932724@4dd8dc3 4 ... 未退出
首先分析一下这个结果流程(后面的结果大体思路一致,故不做分析!!!)
前两行打印 ”新增 …“ 其实是因为 执行execute 方法时走了 if逻辑,创建了两个线程,故打印
而其余的三个线程因为 走了else逻辑,最终执行put方法,打印 ”加入任务队列“
两句 ”正在执行“ 是因为 符合 worker线程中的 while逻辑,故 打印”正在执行…“
然后 前两个线程 执行完了,输出 0 1
此时,这两个线程 中的while条件 while(task != null || (task = taskQueue.take()) != null) 中,task = taskQueue.take() 阻塞获取新的任务,获取了两个新的线程,然后 打印”正在执行…“ 输出 2 3
最后又执行while逻辑,将最后一个任务 执行,并打印,而两个线程继续执行while里面的 阻塞获取,程序并未退出!!!
之所以会是这个结果,是因为前面这里是这样设置的
class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1) 当 task 不为空,执行任务 // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行 while(task != null || (task = taskQueue.take()) != null) { //while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { System.out.println("正在执行 "+task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { System.out.println("worker 被移除"+ this); workers.remove(this); } } }
超时阻塞版
想实现,故使用超时阻塞获取的办法即可
class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1) 当 task 不为空,执行任务 // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行 //while(task != null || (task = taskQueue.take()) != null) { while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { System.out.println("正在执行 "+task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { System.out.println("worker 被移除"+ this); workers.remove(this); } } }
输出:
新增 worker Thread[Thread-0,5,main] test2$$Lambda$1/1831932724@448139f0 新增 worker Thread[Thread-1,5,main] test2$$Lambda$1/1831932724@7ba4f24f 正在执行 test2$$Lambda$1/1831932724@448139f0 加入任务队列 test2$$Lambda$1/1831932724@7699a589 正在执行 test2$$Lambda$1/1831932724@7ba4f24f 加入任务队列 test2$$Lambda$1/1831932724@58372a00 加入任务队列 test2$$Lambda$1/1831932724@4dd8dc3 0 1 正在执行 test2$$Lambda$1/1831932724@7699a589 正在执行 test2$$Lambda$1/1831932724@58372a00 2 3 正在执行 test2$$Lambda$1/1831932724@4dd8dc3 4 worker 被移除Thread[Thread-1,5,main] worker 被移除Thread[Thread-0,5,main] Process finished with exit code 0
阻塞队列溢出版
有一个这样的场景,假设设置核心数为2 等待队列长度为10,而此时15个人任务都来了,那么剩下的3个怎么处理呢?
for (int i = 0; i < 15; i++) { int j = i; threadPool.execute(() -> { try { // 将时间拉长,不能一下子执行完 Thread.sleep(10000000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); }); }
输出:
新增 worker Thread[Thread-0,5,main] test2$$Lambda$1/1831932724@448139f0 新增 worker Thread[Thread-1,5,main] test2$$Lambda$1/1831932724@7ba4f24f 正在执行 test2$$Lambda$1/1831932724@448139f0 正在执行 test2$$Lambda$1/1831932724@7ba4f24f 加入任务队列 test2$$Lambda$1/1831932724@7699a589 加入任务队列 test2$$Lambda$1/1831932724@58372a00 加入任务队列 test2$$Lambda$1/1831932724@4dd8dc3 加入任务队列 test2$$Lambda$1/1831932724@6d03e736 加入任务队列 test2$$Lambda$1/1831932724@568db2f2 加入任务队列 test2$$Lambda$1/1831932724@378bf509 加入任务队列 test2$$Lambda$1/1831932724@5fd0d5ae 加入任务队列 test2$$Lambda$1/1831932724@2d98a335 加入任务队列 test2$$Lambda$1/1831932724@16b98e56 加入任务队列 test2$$Lambda$1/1831932724@7ef20235 等待加入任务队列 test2$$Lambda$1/1831932724@27d6c5e0 ... 未退出
其实本质上是一样的,但是需要着重分析下,实际的流程是这样的
public void execute(Runnable task) { // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行 // 如果任务数超过 coreSize 时,加入任务队列暂存 synchronized (workers) { if(workers.size() < coreSize) { Worker worker = new Worker(task); System.out.println("新增 worker "+ worker + " " + task); workers.add(worker); worker.start(); } else { taskQueue.put(task); } } }
// 阻塞添加 public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { System.out.println("等待加入任务队列 "+ task); fullWaitSet.await(); /** 这里面其实是有一个点需要注意的,就是 其实 await 这次阻塞的是主线程,本质上,await还是会阻塞 并释放锁的。 */ } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("加入任务队列 "+ task); queue.addLast(task); emptyWaitSet.signal(); } finally { lock.unlock(); } }
可以分析到,后13个线程都去执行了put,而这其中的前十次都能逐步的获取锁插入,而第十一次就执行await卡死,没有第十二次和第十三次的输出了。在学习前面章节的时候我们知道,await其实和wait很像,一旦执行就会阻塞,并且释放锁,等到将来苏醒了再去竞争锁。但是为什么没有第十二次和第十三次的输出呢?
这个的原因要从main函数看:
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10); for (int i = 0; i < 15; i++) { int j = i; threadPool.execute(() -> { try { // 将时间拉长,不能一下子执行完 Thread.sleep(10000000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); }); }
它并不是执行了多个线程,如果在多线程的情况下,await会释放锁,请看这个例子
static ReentrantLock lock = new ReentrantLock(); static Condition waitCigaretteQueue = lock.newCondition(); static Condition waitbreakfastQueue = lock.newCondition(); static volatile boolean hasCigrette = false; static volatile boolean hasBreakfast = false; public static void main(String[] args) throws InterruptedException { new Thread(() -> { try { lock.lock(); while (!hasCigrette) { try { System.out.println("烟"); waitCigaretteQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("等到了它的烟"); } finally { lock.unlock(); } }).start(); new Thread(() -> { try { lock.lock(); while (!hasBreakfast) { try { System.out.println("早餐"); waitbreakfastQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("等到了它的早餐"); } finally { lock.unlock(); } }).start(); Thread.sleep(10000); sendBreakfast(); Thread.sleep(10000); sendCigarette(); } private static void sendCigarette() { lock.lock(); try { System.out.println("送烟来了"); hasCigrette = true; waitCigaretteQueue.signal(); } finally { lock.unlock(); } } private static void sendBreakfast() { lock.lock(); try { System.out.println("送早餐来了"); hasBreakfast = true; waitbreakfastQueue.signal(); } finally { lock.unlock(); } }
该例子的输出能够很有效的证明这一点,所以关于await这部分需要注意!!!
拒绝策略版
实际上,阻塞队列溢出,一直阻塞就相当于一种拒绝策略!
定义拒绝策略
@FunctionalInterface // 拒绝策略 interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); }
阻塞队列补充
public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { // 判断队列是否满 if(queue.size() == capcity) { rejectPolicy.reject(this, task); } else { // 有空闲 System.out.println("加入任务队列 "+ task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } }
可以看到,核心的拒绝策略是用户自己提供的
线程池补充
private RejectPolicy<Runnable> rejectPolicy; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } public void execute(Runnable task) { // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行 // 如果任务数超过 coreSize 时,加入任务队列暂存 synchronized (workers) { if(workers.size() < coreSize) { Worker worker = new Worker(task); System.out.println("新增 worker "+ worker + " " + task); workers.add(worker); worker.start(); } else { //taskQueue.put(task); // 1) 死等 // 2) 带超时等待 // 3) 让调用者放弃任务执行 // 4) 让调用者抛出异常 // 5) 让调用者自己执行任务 taskQueue.tryPut(rejectPolicy, task); } } }
运行
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 1. 死等 queue.put(task); // 2) 带超时等待 // queue.offer(task, 1500, TimeUnit.MILLISECONDS); // 3) 让调用者放弃任务执行 // log.debug("放弃{}", task); // 4) 让调用者抛出异常 // throw new RuntimeException("任务执行失败 " + task); // 5) 让调用者自己执行任务 //task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(10000000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); }); }
死等
新增 worker Thread[Thread-0,5,main] test2$$Lambda$2/2093631819@58372a00 加入任务队列 test2$$Lambda$2/2093631819@6d03e736 等待加入任务队列 test2$$Lambda$2/2093631819@568db2f2 正在执行 test2$$Lambda$2/2093631819@58372a00 ...死等
可以看到,第三个线程确实死等了
带超时等待
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 1. 死等 // queue.put(task); // 2) 带超时等待 queue.offer(task, 500, TimeUnit.MILLISECONDS); // 3) 让调用者放弃任务执行 // log.debug("放弃{}", task); // 4) 让调用者抛出异常 // throw new RuntimeException("任务执行失败 " + task); // 5) 让调用者自己执行任务 //task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); }); }
输出:
新增 worker Thread[Thread-0,5,main] test2$$Lambda$2/2093631819@58372a00 加入任务队列 test2$$Lambda$2/2093631819@6d03e736 等待加入任务队列 test2$$Lambda$2/2093631819@568db2f2 正在执行 test2$$Lambda$2/2093631819@58372a00 0 正在执行 test2$$Lambda$2/2093631819@6d03e736 1 worker 被移除Thread[Thread-0,5,main]
可以看到 第三个任务确实取消了
让调用者放弃任务执行
输出:
新增 worker Thread[Thread-0,5,main] test2$$Lambda$2/2093631819@58372a00 加入任务队列 test2$$Lambda$2/2093631819@6d03e736 放弃 test2$$Lambda$2/2093631819@568db2f2 正在执行 test2$$Lambda$2/2093631819@58372a00 0 正在执行 test2$$Lambda$2/2093631819@6d03e736 1 worker 被移除Thread[Thread-0,5,main]
让调用者抛出异常
输出:
新增 worker Thread[Thread-0,5,main] test2$$Lambda$2/2093631819@58372a00 加入任务队列 test2$$Lambda$2/2093631819@6d03e736 正在执行 test2$$Lambda$2/2093631819@58372a00 Exception in thread "main" java.lang.RuntimeException: 任务执行失败 test2$$Lambda$2/2093631819@568db2f2 at test2.lambda$main$0(test2.java:53) at BlockingQueue.tryPut(test2.java:200) at ThreadPool.execute(test2.java:247) at test2.main(test2.java:60) 0 正在执行 test2$$Lambda$2/2093631819@6d03e736 1 worker 被移除Thread[Thread-0,5,main]
让调用者自己执行任务
输出:
新增 worker Thread[Thread-0,5,main] test2$$Lambda$2/2093631819@58372a00 加入任务队列 test2$$Lambda$2/2093631819@6d03e736 正在执行 test2$$Lambda$2/2093631819@58372a00 0 2 正在执行 test2$$Lambda$2/2093631819@6d03e736 1 worker 被移除Thread[Thread-0,5,main]
可以看到 0 2同时输出了,2是主线程执行的