三、MECE分析线程池
3.1 线程池的基本操作
线程池的基本操作包括:
3.1.1 创建线程池
使用ThreadPoolExecutor类的构造函数创建线程池。ThreadPoolExecutor是创建线程池的工具类,封装了几种常用线程池的创建方法,常用方法有如下几种:
固定线程线程池(FixedThreadPool)
固定线程线程池定义
线程池中的线程数量固定不变,当有任务提交时,如果线程池中有空闲线程,则立即使用空闲线程执行任务;如果没有,则等待有线程空闲为止。
固定线程线程池原理分析
创建线程数量固定的线程池,核心线程数corePoolSize和最大线程数maximumPoolSize相同,核心线程不超时,队列是LinkedBlockingQueue,队列大小没有限制。
固定线程线程池使用场景
适合数据数量固定的数据处理场景,例如百度网盘中的批量文件下载功能,指定五个线程同时下载文件,其余任务都在队列排队。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
固定线程线程池项目应用
照片导出功能,把10000个人员照片加密后导出到U盘里,使用线程池数量为5的固定线程池执行任务,还有一个任务分派线程,主要负责查询数据库,监控线程池和提交任务。
缓存线程池(CachedThreadPool)
缓存线程池定义
线程池中的线程数量可以根据任务的多少自动调整,如果有大量任务提交,则线程池会动态增加线程数量;如果没有任务提交,则线程池会动态减少线程数量。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
缓存线程池原理分析
创建缓存线程池,核心线程数corePoolSize为0,最大线程数maximumPoolSize是Integer.MAX_VALUE,线程超时时间是60s,队列是SynchronousQueue同步队列。
缓存线程池使用场景
适合对响应速度要求高,并发少的场景,Okhttp就是用的缓存线程池来处理http请求的,符合手机上并发请求少,响应速度快的要求。
缓存线程池项目应用
使用Okhttp的过程间接使用来了缓存线程池,项目中应该谨慎使用该线程池。
定时器线程池(ScheduledThreadPool)
可以在固定的时间间隔或者指定的时间执行任务,该线程池可以设置固定的线程数量或者可变的线程数量。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
单线程池(SingleThreadExecutor)
单线程线程池定义
线程池中只有一个线程,所有任务都在同一个线程中按照队列顺序依次执行。
单线程线程池原理分析
创建单线程线程池,核心线程数corePoolSize和最大线程数maximumPoolSize都是1,核心线程不超时,队列是LinkedBlockingQueue,队列大小没有限制。
单线程线程池使用场景
适合任务并发少,触发频繁,任务执行时间固定的业务场景。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
单线程线程池项目应用
人脸识别成功保存记录是通过一个单线程线程池保存记录的,人脸识别的间隔时间大于记录保存时间,因此正常情况下任务不会出现阻塞在队列的情况。
ForkJoin线程池
该线程池是Java 7引入的一种专门用于处理分治算法的线程池,可以递归地将任务拆分成小任务,并将小任务分配给线程池中的线程执行,然后将小任务的结果合并起来,最终得到大任务的结果。
ForkJoinPool forkJoinPool = new ForkJoinPool();
3.1.2 提交任务
提交任务方式
使用execute、submit方法提交任务到线程池的任务队列中
fixedThreadPool.execute(new Runnable() { public void run() { // 执行任务 } }); cachedThreadPool.execute(new Runnable() { public void run() { // 执行任务 } }); scheduledThreadPool.schedule(new Runnable() { public void run() { // 执行任务 } }, 5, TimeUnit.SECONDS); singleThreadExecutor.execute(new Runnable() { public void run() { // 执行任务 } }); forkJoinPool.invoke(new RecursiveTask() { public Object compute() { // 执行任务 } });
线程池的 execute()
方法和 submit()
方法都用于向线程池提交任务,但是它们有以下几个区别:
- 返回值不同:
execute()
方法没有返回值,而submit()
方法返回一个Future
对象。 - 异常处理不同:
execute()
方法没有办法处理任务执行时抛出的异常,而submit()
方法可以使用返回的Future
对象处理任务执行时抛出的异常。 - 任务类型不同:
execute()
方法只能提交Runnable
类型的任务,而submit()
方法可以提交Runnable
或Callable
类型的任务。 - 方法重载:
execute()
方法只有一种重载形式,而submit()
方法有多种重载形式,可以指定返回结果、延迟执行等参数。
因此,当需要获取任务执行结果或者处理任务执行时可能会抛出的异常时,应该使用 submit()
方法;
当不需要获取任务执行结果或者不需要处理任务执行时可能会抛出的异常时,可以使用 execute()
方法。 execute()
方法源码如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 1.如果当前线程数量小于核心线程数,开启一个新线程,然后把任务作为该线程首个任务来执行 * 2.如果当前线程数量等于核心线程数,尝试添加任务到阻塞队列BlockingQueue * 3.如果添加队列失败,即队列已满,开启一个新线程,然后把任务作为该线程首个任务来执行 * 4.如果第3步开启线程失败,即线程数量超过最大线程数,调用RejectedExecutionHandler的 * * rejectedExecution方法执行拒绝策略。 */ //获取线程池状态和线程数量的组合值,这两个值被打包到了一个int中 int c = ctl.get(); //workerCountOf 获取Worker的数量,即线程数量。 //isRunning 获取线程池的状态,判断线程池是否是运行状态 if (workerCountOf(c) < corePoolSize) { if (addWorker(command,true)) return; //重新获取,多线程环境,该值可能已经发生变化 c = ctl.get(); } //尝试添加到队列 if (isRunning(c) && workQueue.offer(command)) { //重新检查状态值 如果线程已经shutdown 则拒绝添加任务 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null,false); } //以非核心线程添加模式创建线程,如果失败,走拒绝策略 else if (!addWorker(command,false)) reject(command); }
3.1.3 关闭线程池
- shutdown
使用shutdown方法关闭线程池,等待任务队列中的任务执行完毕后再关闭。
调用shutdown方法后的状态转换:RUNNING-->SHUTDOWN-->TIDYING-->TERMINATED
- shutdownNow
使用shutdownNow方法立即关闭线程池,会中断正在执行的任务,并返回未执行的任务列表。
3.1.4 动态修改线程池大小
使用setCorePoolSize和setMaximumPoolSize方法动态修改线程池中的线程数量。
调用shutdownNow方法后的状态转换:RUNNING-->STOP-->TIDYING-->TERMINATED
3.1.5 拒绝策略
当任务队列已满并且线程池中的线程数量已达到最大值时,使用setRejectedExecutionHandler方法设置拒绝策略来处理无法处理的任务。
3.1.6 监控线程池
使用getActiveCount、getCompletedTaskCount、getTaskCount等方法获取线程池的状态信息。
基本操作可以满足大多数线程池的需求,同时Java线程池还提供了很多高级特性,例如定时任务、线程池工厂等,可以根据具体需求进行选择。
3.2 线程池的生命周期
线程池的生命周期通常包括以下阶段:
- 创建:线程池被创建,但还没有开始处理任务。
- 启动:线程池被启动,开始接受任务,并且根据配置参数创建指定数量的线程。
- 运行:线程池正在运行中,等待接收任务并且执行。
- 终止:线程池被终止,所有的任务已经被执行完毕,线程池中的所有线程被销毁。
需要注意的是,在线程池被终止之前,可能会存在一些情况导致线程池被关闭,比如程序发生异常、线程池被主动关闭等情况。此时,线程池中的所有任务可能无法全部被执行完毕,因此在实际使用中需要注意线程池的关闭策略,避免出现数据丢失等问题。
3.3 线程池的工作原理
线程池是一种多线程处理的机制,线程池允许在应用程序中预先创建一定数量的线程并将它们放在一个池中,线程可以重复使用,以减少线程的创建和销毁开销。
一句话总结线程池工作原理: 线程池的实现整体流程是一个可配置的生产者消费者模型,然后基于单一的阻塞缓冲队列来实现的。
线程池工作原理分为六个步骤讲解,第一个步骤是初始化线程池,第二个步骤是将任务添加到任务队列,第三个步骤是检查线程池状态,第四个步骤是取出任务并执行,第五个步骤是处理任务异常,第六个步骤是关闭线程池。
- 初始化线程池
创建一个线程池对象,并设置线程池的参数,如线程池的大小、任务队列的大小、线程的优先级等。
参考 #3.1.1 四种线程池的创建方式
- 将任务添加到任务队列
当有任务需要执行时,将任务添加到任务队列中。
参考 #2.3.2.1.2 Worker创建
- 检查线程池状态
线程池会周期性地检查自身状态,如果当前线程池中的线程数小于预设的最小线程数,则会创建新的线程。
参考 #2.3.2.1.3 Worker线程原理
- 取出任务并执行
线程池中的线程会不断从任务队列中取出任务并执行。
参考 #2.3.2.1.4 Task任务的获取
- 如果当前线程数量小于核心线程数,开启新线程,将当前任务做为新线程的第一个任务来执行。
- 如果当前线程数量等于核心线程数,尝试添加任务到队列。
- 如果添加队列失败,即队列是满的,则以开启新线程,将当前任务做为新线程的第一个任务来执行。
- 如果新线程开启失败,即当前线程数量等于最大线程数量,执行拒绝策略。
- 处理任务异常
如果任务执行过程中发生了异常,线程池可以处理异常并记录异常信息。
参考 #3.1.5 拒绝策略
- 关闭线程池
当线程池不再需要使用时,需要将线程池关闭。关闭线程池时,首先需要将任务队列中的任务执行完毕,然后再将线程池中的线程关闭。
参考 #3.1.3 关闭线程池
通过使用线程池,可以优化系统性能,减少线程创建和销毁的开销,避免线程过多导致系统资源不足的情况,并提高系统的可维护性和可扩展性。
3.4 线程池代码案例分析
3.4.1 OkHttp
在OKHttp中AsyncCall、Dispatcher和ConnectionPool都是通过线程池进行维护的。
AsyncCall
AsyncCall是一个Runnable接口,可以通过线程池异步执行,下面是run方法
异步请求的执行流程:
- 使用AsyncTimeout监听请求是否超时,会开启一个子线程,线程名称Okio Watchdog ,超时后会调用Call的cancel取消请求 。AsyncTimeout监听的是http请求的完整过程,包括dns解析、请求数据发送、服务器处理、请求数据读取的整个流程。
- 组装过滤器链,开始执行请求流程。
- 回调请求结果,通知Dispatcher请求已经执行完。
override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false //使用AsyncTimeout监听请求是否超时,会开启一个子线程,线程名称 Okio Watchdog //超时后会调用Call的cancel取消请求 timeout.enter() try { //组装过滤器链 开始执行请求流程 val response = getResponseWithInterceptorChain() signalledCallback = true //回调请求结果 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { //请求执行完成 client.dispatcher.finished(this) } } } }
Dispatcher
Dispatcher作用
Dispatcher是用来管理连接和分发请求的,使用线程池执行异步任务。默认使用的线程池是缓存线程池,可以在构建OkHttpClient的时候通过Dispatcher的构造参数传入自己的线程池。
Dispatcher缓存线程池
缓存线程池使用同步队列,核心线程数为0。
提交任务的时候,如果当前没有线程在取任务就会开启新线程执行,也就是说如果当前线程都在忙于执行请求,会立刻开启一个新线程。缓存线程池吞吐量高,响应速度快,但是并发高的情况下会创建大量线程,占用系统资源。
Dispatcher缓存线程池用途
移动客户端的网络请求特点是并发量少,大多数情况只有2、3个同时发出的请求,但是由于大多请求都是由用户触发的请求,因此对响应速度要求较高。缓存线程池恰好满足了移动端的网络需求特点。 为了避免请求过多大量创建线程,因此使用两个队列限制异步请求的数量。同时执行的最大请求数量是64,如果使用缓存线程池,也就相当于限制了同时运行的最大线程数量是64。相同域名的最大请求数量是5。
Dispatcher队列定义
readyAsyncCalls
调用enqueue方法提交给Dispatcher的请求,如果没有提交给线程池执行,那么提交给线程池会从队列中移除。
runningAsyncCalls
正在线程池中执行的异步请求,还没有执行完,执行完会从队列中移除。
runningSyncCalls
正在执行的同步请求,还没有执行完,执行完会从该队列中移除。
Dispatcher成员变量
//同时执行的最大请求的数量 var maxRequests = 64 //每个域名可以同时执行的最大请求数量 var maxRequestsPerHost = 5 private var executorServiceOrNull: ExecutorService? = null //线程池默认使用缓存线程池 @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! } /** 调用enqueue提交给Dispatcher的请求,还没有提交到线程池执行 */ private val readyAsyncCalls = ArrayDeque<AsyncCall>() /** 提交到线程池正在执行的异步请求,还没有执行完 */ private val runningAsyncCalls = ArrayDeque<AsyncCall>() /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private val runningSyncCalls = ArrayDeque<RealCall>() constructor(executorService: ExecutorService) : this() { this.executorServiceOrNull = executorService }
Dispatcher异步请求enqueue
- 把AsyncCall添加到待执行队列 。
- 非webSocket请求,从待执行异步队列和已执行异步队列中查找相同host的请求,把已经存在的call的callsPerHost拷贝到新call的callsPerHost。这样做的目的是,例如第一个请求查找不到相同host的请求,因此callsPerHost是0,添加到线程池中,callsPerHost变成了1。第2个请求从以执行队列中查找到了相同host的请求,拷贝callsPerHost,新AsyncCall的callsPerHost就变成了1。这样传递下去,新添加的AsyncCall中的callsPerHost就是该host同时执行的请求数量。
- 提交给线程池执行
internal fun enqueue(call: AsyncCall) { synchronized(this) { //1.添加到待执行队列 readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { //2.不是webSocket请求 从待执行异步队列和已执行异步队列中查找相同host的请求 //把已经存在的call的callsPerHost拷贝到新call的callsPerHost val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } //3.提交给线程池执行 promoteAndExecute() }
promoteAndExecute执行流程
private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { //1.遍历带执行队列 val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() //正在执行的请求数量大于64 直接退出循环 if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. //单个域名的最大请求数量大于5 处理下一个请求 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. //将请求从待执行队列移除 添加到已执行队列中 i.remove() //增加相同域名的请求数量 asyncCall.callsPerHost.incrementAndGet() //添加到 临时的可执行队列 executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } //遍历可执行队列 提交给线程池执行 for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } //isRunning是同步已执行请求和异步已执行请求的任务总量 isRunning>0说明有任务在执行 =0说明没有任务在执行 return isRunning }
遍历待执行队列,如果已执行队列的任务数量大于64,跳出循环,如果当前asyncCall的相同域名请求数量大于5,处理下一个请求。
将请求从待执行队列移除,添加到已执行队列和临时可执行队列中,更新asyncCall中的callsPerHost。
遍历临时可执行队列,把任务添加到线程池中执行。
返回是否有任务在执行
ConnectionPool
ConnectionPool使用代理模式,被代理类是RealConnectionPool,在此基础上提供了一些开发功能。默认的最大限制连接数是5,保持连接的最大时长是5分钟。
constructor() : this(5, 5, TimeUnit.MINUTES) /** Returns the number of idle connections in the pool. */ fun idleConnectionCount(): Int = delegate.idleConnectionCount() /** Returns total number of connections in the pool. */ fun connectionCount(): Int = delegate.connectionCount() /** Close and remove all idle connections in the pool. */ //清除和关闭连接池中的所有连接 fun evictAll() { delegate.evictAll() } //RealConnectionPool //连接存活的最长时间 默认是5分钟 private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration) private val cleanupQueue: TaskQueue = taskRunner.newQueue() //清理连接的任务 TaskRunner中使用一个缓存线程池执行改任务 private val cleanupTask = object : Task("$okHttpName ConnectionPool") { override fun runOnce() = cleanup(System.nanoTime()) } /** * 使用cas无锁队列存储RealConnection */ private val connections = ConcurrentLinkedQueue<RealConnection>()
3.4.2 ThreadLocal
ThreadLocal原理
ThreadLocal通过线程数据隔离的方式来解决并发数据访问问题,每个线程都有自己的数据副本,ThreadLocal的原理图如下
线程数据隔离的核心是每个Thread对象都有一个属于自己的ThreadLocalMap对象,ThreadLocalMap通过数组实现数据存取,每个数组元素都是一个Entry。
Entry用ThreadLocal做为key,value是我们要存放的数据。
ThreadLocal
一个ThreadLocal只能存取一种类型的数据,存取多种类型的数据可以使用多个ThreadLocal,也可以把数据封装到同一个对象中。
public class ThreadLocal<T> { private final int threadLocalHashCode = nextHashCode(); //生成ThreadLocal的hashcode的原子计数器 private static AtomicInteger nextHashCode = new AtomicInteger(); /** * The difference between successively generated hash codes - turns * implicit sequential thread-local IDs into near-optimally spread * multiplicative hash values for power-of-two-sized tables. 两个ThreadLocal的hash值的间隔差 */ private static final int HASH_INCREMENT = 0x61c88647; /** * Returns the next hash code. hash值 以HASH_INCREMENT累加 */ private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } }
ThreadLocal的hash值是通过计数器自增生成的,使用多个ThreadLocal的情况下会出现hash冲突。
ThreadLocalMap
ThreadLocalMap使用WeakReference,为监听ThreadLocal是否被回收。
ThreadLocal.ThreadLocalMap threadLocals = null; //当创建子线程的时候,子线程可以得到父线程的inheritableThreadLocals ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; //-------------------------------------------------------------
当Entry.get()返回null的时候,说明ThreadLocal已经被回收,这时就要将Entry中的value引用设置为null,避免出现内存泄漏。
//使用WeakReference监听ThreadLocal是否被回收 static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k,Object v) { super(k); value = v; } } //初始化容量 16 private static final int INITIAL_CAPACITY = 16; /** * The table,resized as necessary. * table.length MUST always be a power of two. Entry数组 */ private Entry[] table; /** * The number of entries in the table. 实际存储的数据量 */ private int size = 0; /** * The next size value at which to resize. 扩容阈值 默认是size达到容量的2/3时扩容 */ private int threshold; // Default to 0
为什么使用WeakReference,为监听ThreadLocal是否被回收。当Entry.get()返回null的时候,说明ThreadLocal已经被回收,这时就要将Entry中的value引用设置为null,避免出现内存泄漏。
ThreadLocal核心方法
set
- 获取当前线程Thread引用,获取或创建Thread中的ThreadLocalMap,调用ThreadLocalMap的set方法。
- 让ThreadLocal的hash值和数组长度做与运算得到对应的数组索引index。
- 用线性探测法解决hash冲突,如果数组索引位置的Entry是空的,创建一个新的Entry设置到该位置。如果数组索引位置的key和当前ThreadLocal地址相同,用新值更新旧值。否则就是出现了hash冲突,从当前的i开始向后查找,直到找到一个空的位置为止。在查找的过程中,如果发现ThreadLocal已经被回收,就会调用replaceStaleEntry方法清理对应位置的value数据。
- 创建Entry,增加size。
- 先清理ThreadLocal已经被回收的Entry,然后判断是否需要扩容。
public void set(T value) { //获取当前线程引用 Thread t = Thread.currentThread(); //获取当前线程的ThreadLocalMap 如果是空的会创建一个 ThreadLocalMap map = getMap(t); if (map != null) //设置值 map.set(this,value); else createMap(t,value); } private void set(ThreadLocal<?> key,Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones,in which case,a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; //1.对hashcode做与运算 确定ThreadLocal在数组中的索引 int i = key.threadLocalHashCode & (len-1); //2.从当前索引开始向后查找,找到key直接赋值返回,否则找到一个Entry为空的位置,记录i //用线性探测的方式解决hash冲突的问题 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i,len)]) { ThreadLocal<?> k = e.get(); //key已经存在 直接替换旧值 if (k == key) { e.value = value; return; } //key为空 清理对应的value数据 if (k == null) { replaceStaleEntry(key,value,i); return; } } //3.创建一个新的Entry tab[i] = new Entry(key,value); int sz = ++size; //4.先清理key为空的Entry 然后判断是否需要扩容 if (!cleanSomeSlots(i,sz) && sz >= threshold) rehash(); }
get
- 获取当前线程的ThreadLocalMap,调用ThreadLocalMap的getEntry获取Entry。
- 根据hashcode和数组长度计算对应的数组索引,如果对应位置的Entry不为空并且key和当前ThreadLocal相同,返回Entry。否则执行第3步。
- set数据的时候可能出现了hash冲突,从数组为i的位置开始向后查找,如果找到了对应的key就返回。如果遇到一个位置Entry为null,说明后续的位置都是null,因此直接返回null。如果遇到ThreadLocal被回收的情况,调用expungeStaleEntry移除过期的数据。
- hash冲突情况下,Entry的查找采用线性查找。因此,在同一个Thread中使用大量ThreadLocal的情况下会比较消耗性能。
public T get() { //1.获取当前线程的ThreadLocalMap Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { //获取当前ThreadLocal对应的Entry ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } //--------------------------------------------- private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; //2.Entry不为空并且key和当前ThreadLocal相同 if (e != null && e.get() == key) return e; else //3.可能出现了hash冲突 从i开始向数组后面查找 return getEntryAfterMiss(key,i,e); } //--------------------------------------------- private Entry getEntryAfterMiss(ThreadLocal<?> key,int i,Entry e) { Entry[] tab = table; int len = tab.length; //从i开始循环遍历 当Entry=null的时候 说明后面的肯定都是空的 while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i,len); e = tab[i]; } return null; }
扩容
- 当数据量达到数组容量的2/3的时候才会扩容,扩容后容量是之前容量的2倍,扩容后会把旧数组中的数据拷贝到新数组中,通过hash运算和线性探测计算元素在新数组中的索引。
- 扩容过程是不存在线程安全问题的,因为每个线程都有自己的ThreadLocalMap。
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; //扩容速度为原先容量的2倍 Entry[] newTab = new Entry[newLen]; int count = 0; //把原先的数据拷贝到新数组中 for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { //重新计算hash值 解决hash冲突 int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h,newLen); newTab[h] = e; count++; } } } //重新设置新的扩容阈值 为数组长度的2/3 setThreshold(newLen); size = count; table = newTab; }
内存泄露
当ThreadLocal的强引用被置为null的时候,可能会被gc回收,此时就无法通过ThreadLocal访问到它存储的资源了。
但是还存在一条引用链路,从Thread-->ThreadLocalMap-->Entry-->value的引用链,因此在线程没有结束的情况下,就会发送内存泄漏。如果使用的是线程池,内存泄漏的持续时间就会比较长。 ThreadLocal已经解决了问题,通过弱引用来监听ThreadLocal是否被回收,被回收的时候,断开value和Entry之间的引用链。
使用场景
多线程下载文件或从设备导入图片的时候,统计每个线程处理的文件数量,失败的数量、成功的数量等。
3.4.3 alpha
对于大型App来说,启动任务多,任务依赖复杂。保障任务逻辑的单一性,解耦启动任务逻辑,合理利用多核CPU优势,提高线程运行效率是重点关注的问题。
为了利用多核cpu,提高任务执行效率,让单个任务职责更加清晰,代码更加优雅进而提高启动速度,我们会尽可能让这些工作并发进行。
但这些工作之间可能存在前后依赖的关系,我们又需要想办法保证他们执行顺序的正确性。
所以我们要做的工作是将任务颗粒化,定义好自己的任务,并描述它依赖的任务,将它添加到Project中。框架会自动并发有序地执行这些任务,并将执行的结果抛出来。
那么怎样对任务进行分类呢?
任务进行分类策阅可以通过Alpha等启动器把启动任务管理起来。具体分为四个步骤: 第一个步骤是将启动任务原子化,分为各个任务。
第二个步骤是使用有向无环图管理启动任务。前后依赖的任务串行,无依赖的任务线程池化并行。优先级高的任务在前,优先级低的任务在后。
第三个步骤是启动任务集中化,分任务区块:核心任务,主要任务,延迟任务,懒加载任务。核心任务在attachBaseContext中执行,主要任务在启动页或首页执行,延迟任务在首页后空闲时间执行,懒加载任务在特定的时机执行。
最后一个步骤是启动任务统计化,提供任务的耗时统计和卡口。
任务管理框架图参考如下:
而阿里巴巴的alpha启动器恰好解决了启动任务管理不合理的业务痛点, 那么启动任务管理不合理的业务痛点具体表现有哪些呢?
业务痛点
启动任务管理不合理的业务痛点具体表现有如下五个特征:
- 多线程管理
- 任务的优先级
- 任务之间的先后关系
- 任务是否需要在主线程执行
- 多进程处理
源码分析
为了深入学习阿里巴巴的alpha启动器的原理, 小木箱通过以下流程图带大家认识一下阿里巴巴的alpha启动器的核心知识。
AlphaManager.getInstance(mContext).start()
start判断是否为当前进程和是否能匹配到相关进程任务, 具体参数配置如下:
MAIN_PROCESS_MODE
: 主进程任务SECONDARY_PROCESS_MODE
:非主进程任务ALL_PROCESS_MODE
:适用于所有进程的任务
public void start() { Project project = null; do { //1.是否有为当前进程单独配置的Project,此为最高优先级 if (mProjectForCurrentProcess != null) { project = (Project) mProjectForCurrentProcess; break; } //2.如果当前是主进程,是否有配置主进程Project if (AlphaUtils.isInMainProcess(mContext) && mProjectArray.indexOfKey(MAIN_PROCESS_MODE) >= 0) { project = (Project) mProjectArray.get(MAIN_PROCESS_MODE); break; } //3.如果是非主进程,是否有配置非主进程的Project if (!AlphaUtils.isInMainProcess(mContext) && mProjectArray.indexOfKey(SECONDARY_PROCESS_MODE) >= 0) { project = (Project) mProjectArray.get(SECONDARY_PROCESS_MODE); break; } //4.是否有配置适用所有进程的Project if (mProjectArray.indexOfKey(ALL_PROCESS_MODE) >= 0) { project = (Project) mProjectArray.get(ALL_PROCESS_MODE); break; } } while (false); if (project != null) { addListeners(project); project.start(); } else { AlphaLog.e(AlphaLog.GLOBAL_TAG, "No startup project for current process."); } }
Where: 配置相关进程任务位置?
public void addProject(Task project, int mode) { if (project == null) { throw new IllegalArgumentException("project is null"); } if (mode < MAIN_PROCESS_MODE || mode > ALL_PROCESS_MODE) { throw new IllegalArgumentException("No such mode: " + mode); } if (AlphaUtils.isMatchMode(mContext, mode)) { mProjectArray.put(mode, project); } }
project start
@Override public void start() { mStartTask.start(); }
开启一个mStartTask
?这个mStartTask
是之前设置的些任务中第一个任务吗?
//Project.java private void init() { ... mProject = new Project(); mFinishTask = new AnchorTask(false, "==AlphaDefaultFinishTask=="); mFinishTask.setProjectLifecycleCallbacks(mProject); mStartTask = new AnchorTask(true, "==AlphaDefaultStartTask=="); mStartTask.setProjectLifecycleCallbacks(mProject); mProject.setStartTask(mStartTask); mProject.setFinishTask(mFinishTask); ... } private static class AnchorTask extends Task { private boolean mIsStartTask = true; private OnProjectExecuteListener mExecuteListener; public AnchorTask(boolean isStartTask, String name) { super(name); mIsStartTask = isStartTask; } public void setProjectLifecycleCallbacks(OnProjectExecuteListener callbacks) { mExecuteListener = callbacks; } @Override public void run() { if (mExecuteListener != null) { if (mIsStartTask) { mExecuteListener.onProjectStart(); } else { mExecuteListener.onProjectFinish(); } } } }
Why: 定义一个开始任务
和一个结束任务
。
执行角度: 一个任务序列必须有一个开始节点和一个结束节点。
生产角度: 多个任务可以同时开始,而且有多个任务可以同时作为结束点
设计原则:
- 设置两个节点
方便控制整个流程
- 标记流程开始和结束,方便
任务的监听
AnchorTask父类Task
- 定义
Runnable
,判断是否主线程,并执行这个Runnable
,穿插了一些状态的改变 - 在
Runnable
内部主要是执行了Task.this.run()
,并执行了任务本身。 - 其中
setThreadPriority
方法设置了线程优先级,比如THREAD_PRIORITY_DEFAULT
线程优先级处理CPU资源竞争问题,不影响Task之间的优先级。 - 如果在主线程执行任务,通过
Handler(sHandler)
将事件传递给主线程执行。
如果在非主线程执行的任务,通过线程池(sExecutor)
执行线程任务。
public synchronized void start() { ... switchState(STATE_WAIT); if (mInternalRunnable == null) { mInternalRunnable = new Runnable() { @Override public void run() { android.os.Process.setThreadPriority(mThreadPriority); long startTime = System.currentTimeMillis(); switchState(STATE_RUNNING); Task.this.run(); switchState(STATE_FINISHED); long finishTime = System.currentTimeMillis(); recordTime((finishTime - startTime)); notifyFinished(); recycle(); } }; } if (mIsInUiThread) { sHandler.post(mInternalRunnable); } else { sExecutor.execute(mInternalRunnable); } }
notifyFinished
mSuccessorList
排序
「紧后任务列表」,也就是接下来要执行的任务列表。所以流程就是先把当前任务之后的任务列表进行一个排序,根据优先级排序。然后按顺序执行onPredecessorFinished
方法。 如果紧后任务列表为空,也就代表没有后续任务了,那么就会走onTaskFinish
回调方法,告知当前Project已经执行完毕。
遍历mSuccessorList
列表,执行onPredecessorFinished
方法
监听回调onTaskFinish
方法
void notifyFinished() { if (!mSuccessorList.isEmpty()) { AlphaUtils.sort(mSuccessorList); for (Task task : mSuccessorList) { task.onPredecessorFinished(this); } } if (!mTaskFinishListeners.isEmpty()) { for (OnTaskFinishListener listener : mTaskFinishListeners) { listener.onTaskFinish(mName); } mTaskFinishListeners.clear(); } }
How: 紧后任务怎么加进来?紧后任务怎么排序?
onPredecessorFinished
紧后任务列表是通过after方法
实现的
builder.add(Task2).after(Task1)
,所以after代表Task2要在Task1后面执行,Task2成了Task1的紧后任务。
同理,Task1也就成了Task2的紧前任务。是通过addPredecessor
方法,在添加紧后任务同时也添加紧前任务。
紧前任务添加了有什么用呢?难不成还倒退回去执行?
如果有多个任务的紧后任务都是一个,比如这种情况:builder.add(Task4).after(Task2,Task3)
。
Task4是Task2和Task3的紧后任务,所以在Task2执行完之后,还要判断Task3是否执行成功,然后才能执行Task4,这就是紧前任务列表的作用。
onPredecessorFinished
就是做这样的工作的。
How: 紧后任务列表的排序是如何排序呢?
通过getExecutePriority
方法获取task执行优先级数字,正序排列,越小任务执行时机越早。
而setExecutePriority
方法设置了排序优先级。
- 各种回调:包括一些task的回调,project的回调。
- 日志记录:比如耗时时间的记录,刚才执行任务时候的
recordTime方法
,就是记录了每个task的耗时。 - Task脚本化:通过
XmlPullParser
类来解析xml数据,然后生成Project来配置Project的Task。 - 设计模式:构建Project的建造者模式,通过传入task名称可以创建Task工厂模式。
//1、紧后任务添加 public Builder after(Task task) { task.addSuccessor(mCacheTask); mFinishTask.removePredecessor(task); mIsSetPosition = true; return Builder.this; } void addSuccessor(Task task) { task.addPredecessor(this); mSuccessorList.add(task); } //2、紧后任务列表排序 public static void sort(List<Task> tasks) { if (tasks.size() <= 1) { return; } Collections.sort(tasks, sTaskComparator); } private static Comparator<Task> sTaskComparator = new Comparator<Task>() { @Override public int compare(Task lhs, Task rhs) { return lhs.getExecutePriority() - rhs.getExecutePriority(); } }; //3、紧后任务执行 synchronized void onPredecessorFinished(Task beforeTask) { if (mPredecessorSet.isEmpty()) { return; } mPredecessorSet.remove(beforeTask); if (mPredecessorSet.isEmpty()) { start(); } }
3.4.4 一键暂停和恢复下载
百度网盘批量下载文件,如何实现一键暂停和恢复?可以扩展线程池实现一个可以暂停和恢复的线程池
线程池可以让所有Worker暂停新任务的执行,但是正在下载的任务并没有被暂停,所以需要在下载任务中处理暂停和恢复的情况。
当任务暂停的时候,退出读取数据的循环,关闭连接。
因为暂停的时间肯能比较长,为了防止资源占用时间较长,需要先关闭连接,循环退出后有两种处理方式。
循环退出即任务执行完,等回复执行的时候重新提交该任务进行断点续传,由于线程池的排队机制,暂停的任务将无法继续执行,而是在队列中排队,不符合需求。
循环退出后,在外层循环判断当前任务是否下载完,如果没有继续断点续传,续传前判断执行是否暂停,如果暂停则等待,当恢复执行的时候,唤醒当前线程.。
在beforeExecute方法中将任务添加到集合中,在afterExecute中将任务移除。因此,当暂停线程池的时候,集合中的任务就是正在执行的任务,依次遍历调用任务的pause方法,当恢复线程池的时候,依次遍历调用任务的resume方法。
public class CustomExecutor extends ThreadPoolExecutor { private List<DownLoadTask> executingTask=new LinkedList<>(); private volatile boolean isRunning; private ReentrantLock pauseLock=new ReentrantLock(); private Condition pauseCondition=pauseLock.newCondition(); public CustomExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) { super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue); } public void pause(){ if (!isRunning){ return; } synchronized (this){ Iterator<DownLoadTask> iterator = executingTask.iterator(); while (iterator.hasNext()){ DownLoadTask task = iterator.next(); task.pause(); } isRunning=false; } } public void resume(){ if (isRunning){ return; } pauseLock.lock(); try { pauseCondition.signalAll(); Iterator<DownLoadTask> iterator = executingTask.iterator(); while (iterator.hasNext()){ DownLoadTask task = iterator.next(); task.resume(); } isRunning=true; } finally { pauseLock.unlock(); } } @Override protected void beforeExecute(Thread t,Runnable r) { super.beforeExecute(t,r); pauseLock.lock(); try { while (!isRunning){ pauseCondition.await(); } } catch (InterruptedException e) { t.interrupt(); } finally { pauseLock.unlock(); } synchronized (this){ if (r instanceof DownLoadTask){ executingTask.add((DownLoadTask) r); } } } @Override protected void afterExecute(Runnable r,Throwable t) { super.afterExecute(r,t); synchronized (this){ if (r instanceof DownLoadTask){ executingTask.remove(r); } } } }
3.5 线程池性能优化
- 选择合适的线程池大小:线程池大小应该根据系统的处理能力、资源限制以及任务的特性来选择。过小的线程池会导致任务排队等待,过大的线程池会导致资源浪费和调度开销增加。一般来说,线程池大小应该设置为处理器核心数的两倍。
- 使用合适的队列类型:线程池的任务队列可以是阻塞队列或非阻塞队列。阻塞队列可以避免任务排队等待时的 busy waiting,但会增加系统开销。非阻塞队列可以减少系统开销,但会增加任务排队等待的时间。选择适合任务特性和处理需求的队列类型可以提高线程池性能。
- 避免任务过多和过少:过多的任务会导致线程池过载,过少的任务会导致线程池资源浪费。应该根据实际任务需求和系统处理能力来合理分配任务,避免过多或过少的任务。
- 合理设置线程池参数:线程池的参数包括核心线程数、最大线程数、线程存活时间和任务队列大小等。应该根据系统特性和任务需求来设置这些参数,以提高线程池性能。
- 优化任务执行效率:线程池的性能也与任务执行效率有关。可以优化任务代码,避免耗时操作和竞争条件,提高任务执行效率,从而提高线程池性能。
- 监控线程池状态:应该定期监控线程池的状态,包括线程池大小、线程使用情况、任务队列情况等。及时发现问题并进行调整,以保证线程池的性能。
3.6 线程池注意事项
线程池监控
项目中出现过记录丢失的情况,有一个记录保存过程中卡住,从而导致后面的任务都存储到了阻塞队列BlockingQueue中。
设备重启后,直接导致队列中的记录全部丢失。
解决方法是对线程池的运行状态进行监控,正常情况下阻塞队列BlockingQueue里应该是没有任务的,当阻塞队列BlockingQueue中的任务数量超过某个阈值后触发异常任务管理机制。
不同任务类型应该使用不同的线程池,不要把所有任务都用同一个线程池执行。主要从任务类型、业务场景、任务时间三个维度去考量。
任务类型
任务主要分为CPU密集和IO密集两种类型,现代计算机IO都是通过DMA直接存储访问器处理的,处理完成后在给CPU发一个中断,CPU在继续执行。
如果把这两种任务放到一个线程,读取IO文件的时候,线程就需要等待IO完成才能继续执行。
文件读取和数据处理不能并发处理,任务执行时间就增长了。
业务场景
app中比较常见的有前台任务和后台任务,面向用户操作的是前台任务,前台任务对响应速度要求比较高,例如用户点击按钮请求服务器。
后台任务是长时间在后台运行的任务,例如百度网盘批量下载文件。
如果前台任务用Okhttp异步请求,后台任务也使用Okhttp异步请求,相当于都使用Okhttp缓存线程池,可能会导致线程数量大量增加。
这种情况后台下载文件应该自定义线程池,使用Okhttp同步请求。
任务时间
时间长的任务和时间短的任务不要使用同一个线程池,会导致时间短的任务不能及时执行。
3.7 线程池线程数量确定
CPU密集型
线程数可以设置为N+1,N是CPU核心数。多出来一个是为了防止线程缺页中断或其他原因导致的任务暂停,这时候多出来的线程就可以充分使用CPU的空闲时间。
IO密集型
线程数可以设置为N*2,IO密集型任务不占用CPU,现代计算机都是通过DMA直接内存访问控制器处理的。在执行这类任务的时候,CPU会有许多空闲时间执行其他任务,因此可以多设置一些线程。
通用公式
IO耗时占比越多,线程数量越多。线程数通用公式参考如下:
线程数 = CPU 核心数 * (1+ IO 耗时/CPU 耗时)
3.8 线程池业务防劣化Lint工具
为什么不建议使用Executors创建线程?实际开发中,不建议使用Executors创建线程池,有如下三个原因:
- 单线程和固定数量线程线程池的阻塞队列BlockingQueue都没有设置大小,如果有一个任务阻塞,可能会导致队列中的任务无限增加,最终触发oom或者导致任务全部丢失。
- 缓存线程池线程数量无上限,如果任务过多,并且任务执行时间都很长,可能会导致线程数量无限增长,最终触发oom。
- 不能指定任务拒绝策略,默认的拒绝策略为AbortPolicy,如果不设置可能会导致程序崩溃。
那么该如何在编译期去发现上述问题呢? Android Lint是Google提供给Android开发者的静态代码检查工具。
使用Lint对Android工程代码进行扫描和检查,可以发现代码潜在的问题,提醒程序员及早修正。
通过lint工具防劣化, 提醒业务使用架构组独享的线程池。
下面就由小木箱带大家实现一下禁止使用Executors创建线程池的Lint工具吧~
public class ThreadPoolDetector extends Detector implements Detector.JavaScanner { public static final Issue ISSUE = Issue.create( "创建线程池", "避免自己创建ThreadPool", "请勿直接使用Executors创建线程,建议使用统一的线程池管理工具类", Category.PERFORMANCE, 6, Severity.WARNING, new Implementation(ThreadPoolDetector.class, Scope.JAVA_FILE_SCOPE) ); @Override public List<Class<? extends Node>> getApplicableNodeTypes() { return ImmutableList.of(MethodCall.class); } @Override public void visitMethodCall(@NonNull JavaContext context, @NonNull UCall node, @NonNull PsiMethodCallExpression call) { if (node.getMethodName().equals("newFixedThreadPool") || node.getMethodName().equals("newCachedThreadPool") || node.getMethodName().equals("newSingleThreadExecutor")) { context.report(ISSUE, node, context.getLocation(node), "不建议使用Executors创建线程, 改用ThreadPoolExecutor"); } } }
四、SCQA分析线程池
答案未来将上传B站, 请关注B站号: 小木箱成长营
- 日常工作中有用到线程池吗?什么是线程池?为什么要使用线程池?
- 工作线程Worker 继承 AQS 实现了锁机制,那 ThreadPoolExecutor 都用到了哪些锁?为什么要用锁?
- 项目中是怎样使用线程池的?Executors 了解吗?
- 线程池有哪些参数?
- 线程池的运行原理是什么?
- 线程池的执行流程?
- 如何合理配置线程池?
- 核心线程能否退出?
- 拒绝策略有哪些?适用场景是怎么样的?
- 使用线程池的过程中遇到过哪些坑或者需要注意的地方?
- 如何监控线程池?
- JDK自带的线程池种类有哪些?
- 为什么不推荐使用JDK自带的线程池?
- 如何合理设置核心线程数的大小?
- 说说submit和 execute两个方法有什么区别?
- shutdownNow() 和 shutdown() 两个方法有什么区别?
- 调用了shutdownNow或者shutdown,线程一定会退出么?
- 什么是阻塞队列?阻塞队列有哪些?为什么线程池要使用阻塞队列?
- 通过 ThreadPoolExecutor 来创建线程池,那核心参数设置多少合适呢?
五、结语
三大分析法分析线程池主要分为四部分,第一部分是4W2H分析线程池,第二部分是MECE分析线程池,第三部分是SCQA分析线程池,最后一部分是结语。
其中,4W2H分析线程池主要围绕线程池提出了6个高价值问题。
其中,MECE分析线程池主要分为线程池基本操作、线程池生命周期、线程池工作原理、线程池代码案例分析、线程池的性能优化、线程池注意事项、线程池线程数量确定和线程池业务防劣化8部分。
线程池学习的重要性是不可忽视的。在现代互联网时代,线程池是一种重要的多线程编程技术,能够提高程序的性能、稳定性和可靠性。因此,学习线程池成为了每一位Android开发工程师的必备技能。
希望通过通过本文线程池学习,能够让您更快的通过职场面试同时也能解决工作中的业务痛点。
如果你觉的小木箱的文章对你有所帮助,那么可以关注公众号小木箱成长营。让你的知识和视野得到更广阔的拓展吧,下一篇将介绍Java并发关键字那些事,同样是并发编程核心内容。 今天就到这里啦,我是小木箱,我们下一篇见~
参考资料: