创建线程池
现在进入正题,新建了一个 CustomThreadPool
类,它的工作原理如下:
简单来说就是往线程池里边丢任务,丢的任务会缓冲到队列里;线程池里存储的其实就是一个个的 Thread
,他们会一直不停的从刚才缓冲的队列里获取任务执行。
流程还是挺简单。
先来看看我们这个自创的线程池的效果如何吧:
初始化了一个核心为3、最大线程数为5、队列大小为 4 的线程池。
先往其中丢了 10 个任务,由于阻塞队列的大小为 4 ,最大线程数为 5 ,所以由于队列里缓冲不了最终会创建 5 个线程(上限)。
过段时间没有任务提交后(sleep
)则会自动缩容到三个线程(保证不会小于核心线程数)。
构造函数
来看看具体是如何实现的。
下面则是这个线程池的构造函数:
会有以下几个核心参数:
miniSize
最小线程数,等效于ThreadPool
中的核心线程数。
maxSize
最大线程数。
keepAliveTime
线程保活时间。
workQueue
阻塞队列。
notify
通知接口。
大致上都和 ThreadPool
中的参数相同,并且作用也是类似的。
需要注意的是其中初始化了一个 workers
成员变量:
/** * 存放线程池 */ private volatile Set<Worker> workers; public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) { workers = new ConcurrentHashSet<>(); }
workers
是最终存放线程池中运行的线程,在 j.u.c
源码中是一个 HashSet
所以对他所有的操作都是需要加锁。
我这里为了简便起见就自己定义了一个线程安全的 Set
称为 ConcurrentHashSet
。
其实原理也非常简单,和 HashSet
类似也是借助于 HashMap
来存放数据,利用其 key
不可重复的特性来实现 set
,只是这里的 HashMap
是用并发安全的 ConcurrentHashMap
来实现的。
这样就能保证对它的写入、删除都是线程安全的。
不过由于 ConcurrentHashMap
的 size()
函数并不准确,所以我这里单独利用了一个 AtomicInteger
来统计容器大小。
创建核心线程
往线程池中丢一个任务的时候其实要做的事情还蛮多的,最重要的事情莫过于创建线程存放到线程池中了。
当然我们不能无限制的创建线程,不然拿线程池来就没任何意义了。于是 miniSize maxSize
这两个参数就有了它的意义。
但这两个参数再哪一步的时候才起到作用呢?这就是首先需要明确的。
从这个流程图可以看出第一步是需要判断是否大于核心线程数,如果没有则创建。
结合代码可以发现在执行任务的时候会判断是否大于核心线程数,从而创建线程。
worker.startTask()
执行任务部分放到后面分析。
这里的 miniSize
由于会在多线程场景下使用,所以也用 volatile
关键字来保证可见性。
队列缓冲
结合上面的流程图,第二步自然是要判断队列是否可以存放任务(是否已满)。
优先会往队列里存放。
上至封顶
一旦写入失败则会判断当前线程池的大小是否大于最大线程数,如果没有则继续创建线程执行。
不然则执行会尝试阻塞写入队列(j.u.c
会在这里执行拒绝策略)
以上的步骤和刚才那张流程图是一样的,这样大家是否有看出什么坑嘛?