1. 线程池有多重要🤔🤔🤔
线程是一个程序“媛”一定会涉及到的一个概念,但是线程的创建和切换代价都比较大的。所以,我们有没有一个好的方案能做到线程的复用呢?这就涉及到一个概念——线程池。线程池解决的核心问题就是资源管理的问题,合理的使用线程池能够带来3个很明显的好处:
- 降低资源消耗:通过重用已经创建的线程,来降低线程创建和销毁造成的资源消耗;
- 提高响应速度:在高并发场景下,任务到达时不需要等待线程创建就可以立即执行;
- 提高线程的可管理性:线程在系统中是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为不合理的分配线程导致资源调度失衡,降低系统的可用性,而使用线程池则可以统一管理、分配、调优和监控。
综上所述,了解线程池的原理和使用在生产中是非常重要的。本文主要探讨JDK提供的ThreadPoolExecutor类。
2. Java对多线程的支持⚙️——ThreadPoolExecutor
Java的线程池支持主要通过ThreadPoolExecutor来实现,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor。
2.1 核心原理
首先了解ThreadPoolExecutor的继承关系:
- 顶层接口Executor:定义 execute 方法来执行任务,入参是 Runnable,无出参
publicinterfaceExecutor { /*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/voidexecute(Runnablecommand); }
将任务的提交和执行进行解耦,使用者无需关注如何创建线程和如何调度线程,仅需要提供Runnable,关注核心任务的逻辑,将Runnable提交到Executor,由Executor完成线程的调配和任务的执行。
- ExecutorService:Executor 的功能太弱,ExecutorService 丰富了对任务的执行和管理的功能,补充可以为一个或者一批异步任务生成Future的方法,并提供了管理线程池的方法,比如shutdown()和shutdownNow()
publicinterfaceExecutorServiceextendsExecutor { voidshutdown(); List<Runnable>shutdownNow(); booleanisShutdown(); booleanisTerminated(); booleanawaitTermination(longtimeout, TimeUnitunit) throwsInterruptedException; <T>Future<T>submit(Callable<T>task); <T>Future<T>submit(Runnabletask, Tresult); Future<?>submit(Runnabletask); <T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks) throwsInterruptedException; <T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks, longtimeout, TimeUnitunit) throwsInterruptedException; <T>TinvokeAny(Collection<?extendsCallable<T>>tasks) throwsInterruptedException, ExecutionException; <T>TinvokeAny(Collection<?extendsCallable<T>>tasks, longtimeout, TimeUnitunit) throwsInterruptedException, ExecutionException, TimeoutException; }
- AbstractExecutorService:封装了Executor很多通用功能的抽象类,实现了部分ExecutorService方法。可以串联任务的流程,保证下次的实现仅关注一个执行任务的方法即可。
- ThreadPoolExecutor:是实现ExecutorService接口的实现类之一(当然实现ExecutorService接口的常用类还有ScheduledThreadPoolExecutor、TimedExecutorService等),主要是维护线程的生命周期。
那么ThreadPoolExecutor是如何做到的呢?其运行原理如下:
step1:调用ThreadPoolExecutor的execute提交线程,首先检查CorePool,如果CorePool内的线程小于CorePoolSize,新创建线程执行任务。
step2:如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
step3:如果不能加入BlockingQueue,在小于MaxPoolSize的情况下创建线程执行任务。
step4:如果线程数大于等于MaxPoolSize,那么执行拒绝策略
实际上线程池的运行是分为两部分:任务管理、线程管理。任务管理相当于生产者角色,提交任务即生产消息;线程管理相当于消费者角色,线程被统一维护在线程池内,根据任务请求的情况进行线程的分配,当线程执行完任务后会继续获取新的任务执行,最终获取不到任务的时候,线程会被回收。
了解完整体的设计和核心原理,再具体看下线程池是如何管理任务和线程的?接下来从线程池的生命周期开始分析。
2.2 生命周期
线程池的运行状态,不是由用户显式设置的,而是伴随着线程池的运行,由内部来维 护。线程池内部使用一个变量维护两个值:运行状态 (runState) 和线程数量 (workerCount)。在具体实现中,线程池将运行状态 (runState)、线程数量 (workerCount)。两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 这个 AtomicInteger 类型,是对线程池的运行状态和线程池中有效线程的数量 进行控制的一个字段,它同时包含两部分的信息:运行状态 (runState) 、有效线程的数量 (workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关 决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线 程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码 所示:
// 计算当前运行状态privatestaticintrunStateOf(intc) { returnc&~CAPACITY; } // 计算当前线程数据privatestaticintworkerCountOf(intc) { returnc&CAPACITY; } // 通过状态和线程数计算ctlprivatestaticintctlOf(intrs, intwc) { returnrs|wc; }
ThreadPoolExecutor 的运行状态有 5 种,分别为:
- RUNNING:运行状态,可以接受新提交的任务,并且能够处理阻塞队列中的任务;
- SHUTDOWN:关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中的任务;
- STOP:停止状态,不再接受新的任务,也不处理阻塞队列中的任务,中断正在处理的任务线程;
- TIDYING:整理状态,所有的任务都已经终止,workCount(有效线程数为0);
- TERMINATED:终止状态,在terminated方法执行之后进入该状态。
分析完线程池的生命周期,那么在线程池中是如何实现调度呢?
2.3 线程调度
线程池的本质是对任务和线程的管理,而做到这一点的核心思想就是将任务和线程两者解耦,不让两者直接关联。前面小节有提到,线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。那么缓冲模块是线程池能够管理任务的核心部分。
从存储结构、是否有界、是否线程安全,逐个分析常用的几种类型阻塞队列:
- ArrayBlockingQueue:是一个基于数组的阻塞队列
- 底层存储结构:数组
- 是否有界:有(创建该阻塞队列实例需要指定队列容量)
- 是否线程安全:是(在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制,换言之生产者线程与消费者线程间无法同时操作)
- LinkedBlockingQueue:是一个基于链表的阻塞队列,
- 底层存储结构:链表
- 是否有界:可有可无(该阻塞队列容量默认为 Integer.MAX_VALUE,即如果未显式设置队列容量时可以视为是一个无界队列;反之构建实例过程中指定队列容量,则其就是一个有界队列。)
- 是否线程安全:是(在并发控制层面,其使用了两个ReentrantLock可重入锁来分别控制对入队、出队这两种类型的操作。使得生产者线程与消费者线程间可以同时操作提高效率。特别地对于链表这种结构而言,Java还提供了一个实现BlockingDeque接口的LinkedBlockingDeque类——其是一个基于链表的双向阻塞队列)
- PriorityBlockingQueue:优先级队列
- 底层存储结构:使用数组实现元素的存储、最小堆的表示(默认使用元素的自然排序,即要求元素实现Comparable接口;或者显式指定比较器Comparator。)
- 是否有界:无
- 是否线程安全:否(在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制。在创建该队列实例时虽然可以指定容量。但这并不是队列的最终容量,而只是该队列实例的初始容量。一旦后续过程队列容量不足,其会自动进行扩容。值得一提的是,为了保证同时只有一个线程进行扩容,其内部是通过CAS方式来实现的,而不是利用ReentrantLock可重入锁来控制。)
示例:
publicvoidtest1() { BlockingQueue<Integer>blockingQueue=newPriorityBlockingQueue<>(2); blockingQueue.offer(13); blockingQueue.offer(5); blockingQueue.offer(7); Integersize=blockingQueue.size(); System.out.println("blockingQueue: "+blockingQueue+", size: "+size); Integere1=blockingQueue.poll(); System.out.println("e1: "+e1); Integere2=blockingQueue.poll(); System.out.println("e2: "+e2); Integere3=blockingQueue.poll(); System.out.println("e3: "+e3); }
运行结果:
- DelayQueue:延迟队列
- 底层存储结构:使用PriorityQueue实现元素的存储
- 是否有界:无
- 是否线程安全:否(使用ReentrantLock实现线程同步)
示例:
publicclassBlockingQueueTest { privatestaticDateTimeFormatterformatter=DateTimeFormatter.ofPattern("HH:mm:ss"); publicvoidtest2() throwsException { BlockingQueue<Cache>blockingQueue=newDelayQueue<>(); newThread(() -> { while (true) { try { Cachecache=blockingQueue.take(); info("消费者: "+cache.toString()); } catch (Exceptione) { System.out.println("Happen Exception: "+e.getMessage()); } } }).start(); LongtimeStamp=System.currentTimeMillis(); Cachecache1=newCache("name", "Aaron", timeStamp+15*1000); blockingQueue.put(cache1); Cachecache2=newCache("age", "18", timeStamp+27*1000); blockingQueue.put(cache2); Cachecache3=newCache("country", "China", timeStamp+7*1000); blockingQueue.put(cache3); Thread.sleep(120*1000); } /*** 打印信息*/privatestaticvoidinfo(Stringmsg) { Stringtime=formatter.format(LocalTime.now()); Stringthread=Thread.currentThread().getName(); Stringlog="["+time+"] "+msg; System.out.println(log); } privatestaticclassCacheimplementsDelayed { // 缓存 KeyprivateStringkey; // 缓存 ValueprivateStringvalue; // 缓存到期时间privateLongexpire; /*** 计算当前延迟时间* @param unit* @return*/publiclonggetDelay(TimeUnitunit) { // 缓存有效的剩余毫秒数longdelta=expire-System.currentTimeMillis(); returnunit.convert(delta, TimeUnit.MILLISECONDS); } /*** 定义比较规则, 延迟时间按从小到大进行排序* @param o* @return*/publicintcompareTo(Delayedo) { Cacheother= (Cache) o; returnthis.getExpire().compareTo(other.getExpire()); } publicStringtoString() { Datetime=newDate(expire); SimpleDateFormatformatter=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); StringtimeStr=formatter.format(time); return"Cache, key: "+key+", expire: "+timeStr; } } }
运行结果:
- SynchronousQueue:同步队列(该队列没有容量无法存储元素,故生产者添加的数据会直接被消费者获取并且立刻消费。所以当生产者线程添加数据时,如果此时恰好有一个消费者已经准备好获取队头元素了,则会添加成功;否则要么添加失败返回false要么被阻塞。)Executors.newCachedThreadPool()创建的线程池实例,其内部任务队列使用的就是SynchronousQueue,故offer方法添加任务到队列失败后则会开启新的线程来进行处理。
示例:
publicvoidtest3() { BlockingQueue<Integer>blockingQueue=newSynchronousQueue<>(); Booleanb1=blockingQueue.offer(237); info("生产者 b1: "+b1); // 消费者线程newThread( ()->{ try{ Integere=blockingQueue.take(); info("消费者:"+e); } catch (Exceptione) { info("Happen Exception: "+e.getMessage()); } } ).start(); // 确保消费者线程已经准备完毕try { Thread.sleep(2000); } catch (Exceptione) {} Booleanb2=blockingQueue.offer(996); info("生产者 b2: "+b2); try { Thread.sleep(120*1000); } catch (Exceptione) {} }
测试结果如下,符合预期。生产者第一次添加元素结果失败,原因很简单。因为同步队列没有存储元素的能力,故如果没有消费者直接取走,则生产者即会添加失败;第二次添加时,消费者线程已经在阻塞等待了,故添加成功:
下面利用阻塞的put方法来添加元素,示例代码如下所示:
publicvoidtest4() { BlockingQueue<Integer>blockingQueue=newSynchronousQueue<>(); // 生产者线程newThread(() -> { try { info("生产者: Start"); while (true) { Integernum= (int) (Math.random() *100+1); info("生产者: put "+num); blockingQueue.put(num); } } catch (Exceptione) { info("Happen Exception: "+e.getMessage()); } }).start(); // 消费者线程newThread(() -> { try { info("消费者: Start"); while (true) { try { Thread.sleep(5000); } catch (Exceptione) { } Integere=blockingQueue.take(); info("消费者: "+e); } } catch (Exceptione) { info("Happen Exception: "+e.getMessage()); } }).start(); try { Thread.sleep(120*1000); } catch (Exceptione) {} }
3. 小结
本片仅从线程池的源码出发,分析其原理,具体使用过程中整理了几条使用TIPS如下:
使用TIPS:
1.shutdown()和shutdownNow()
shutdown做了几件事:
- 检查是否能操作目标线程
- 将线程池状态转为SHUTDOWN
- 中断所有空闲线程
publicvoidshutdown() { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
什么是空闲线程?interruptIdleWorkers的核心逻辑如下:
privatevoidinterruptIdleWorkers(booleanonlyOne) { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { for (Workerw : workers) { Threadt=w.thread; if (!t.isInterrupted() &&w.tryLock()) { try { t.interrupt(); } catch (SecurityExceptionignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
这里主要是为了中断worker,但是中断之前需要先获取锁,这就意味着正在运行的Worker不能中断。但是上面的代码有w.tryLock(),那么获取不到锁就不会中断,shutdown的Interrupt只是对所有的空闲Worker(正在从workQueue中取Task,此时Worker没有加锁)发送中断信号。
当调用ShutDown方法时,首先设置了线程池的状态为ShutDown,此时1阶段的worker进入到状态判断时会返回null,此时Worker退出。
因为getTask的时候是不加锁的,所以在shutdown时可以调用worker.Interrupt.此时会中断退出,Loop到状态判断时,同时workQueue为empty。那么抛出中断异常,导致重新Loop,在检测线程池状态时,Worker退出。如果workQueue不为null就不会退出,此处有些疑问,因为没有看见中断标志位清除的逻辑,那么这里就会不停的循环直到workQueue为Empty退出。
这里也能看出来shutdown()只是清除一些空闲Worker,并且拒绝新Task加入,对于workQueue中的线程还是继续处理的。
对于shutdown中获取mainLock而addWorker中也做了mainLock的获取,这么做主要是因为Works是HashSet类型的,是线程不安全的,可以看到在addWorker后面也是对线程池状态做了判断,将Worker添加和中断逻辑分离开。
再看shutdownNow:
publicList<Runnable>shutdownNow() { List<Runnable>tasks; finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks=drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); returntasks; }
shutdownNow和shutdown代码类似,但是实现却很不相同。首先是设置线程池状态为STOP,前面的代码可以看到,是对SHUTDOWN有一些额外的判断逻辑,但是对于>=STOP,基本都是reject,STOP也是比SHUTDOWN更加严格的一种状态。此时不会有新Worker加入,所有刚执行完一个线程后去GetTask的Worker都会退出。
之后调用interruptWorkers:
privatevoidinterruptWorkers() { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { for (Workerw : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
这里可以看出来,此方法目的是中断所有的Worker,而不是像shutdown中那样只中断空闲线程。这样体现了STOP的特点,中断所有线程,同时workQueue中的Task也不会执行了。
所以接下来drainQueue:
privateList<Runnable>drainQueue() { BlockingQueue<Runnable>q=workQueue; ArrayList<Runnable>taskList=newArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnabler : q.toArray(newRunnable[0])) { if (q.remove(r)) taskList.add(r); } } returntaskList; }
获取所有没有执行的Task,并且返回。
这也体现了STOP的特点:拒绝所有新Task的加入,同时中断所有线程,WorkerQueue中没有执行的线程全部抛弃。所以此时Pool是空的,WorkerQueue也是空的。
2.work和Task
Worker是当前线程池中的线程,而task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。
3.maximumPoolSize和corePoolSize
maximumPoolSize为线程池最大容量,也就是说线程池最多能起多少Worker。corePoolSize是核心线程池的大小,当corePoolSize满了时,同时workQueue full(ArrayBolckQueue是可能满的) 那么此时允许新建Worker去处理workQueue中的Task,但是不能超过maximumPoolSize。超过corePoolSize之外的线程会在空闲超时后终止。
4.Keep-alive times 参数:
作用: 如果当前线程池中有超过 coreSize 的线程,并且线程空闲的时间超过 keepAliveTime,当前线程就会被回收,这样可以避免线程没有被使用时的资源浪费;
通过 setKeepAliveTime 方法可以动态的设置 keepAliveTime 的值;
如果设置 allowCoreThreadTimeOut 为 ture 的话,core thread 空闲时间超过 keepAliveTime 的话,也会被回收
5.拒绝策略:
在 Executor 已经关闭或对最大线程和最大队列都使用饱和时,可以使用 RejectedExecutionHandler 类进行异常捕捉。有如下四种处理策略:
- AbortPolicy(默认):抛出异常
- CallerRunsPolicy:不使用线程池,主线程来执行
- DiscardPolicy:直接丢弃任务
- DiscardOldestPolicy:丢弃队列中最老任务
6.ThreadPoolExecutor初始化参数设置思路:
publicThreadPoolExecutor(intcorePoolSize, intmaximumPoolSize, longkeepAliveTime, TimeUnitunit, BlockingQueue<Runnable>workQueue, ThreadFactorythreadFactory, RejectedExecutionHandlerhandler) { if (corePoolSize<0||maximumPoolSize<=0||maximumPoolSize<corePoolSize||keepAliveTime<0) thrownewIllegalArgumentException(); if (workQueue==null||threadFactory==null||handler==null) thrownewNullPointerException(); this.acc=System.getSecurityManager() ==null?null : AccessController.getContext(); this.corePoolSize=corePoolSize; this.maximumPoolSize=maximumPoolSize; this.workQueue=workQueue; this.keepAliveTime=unit.toNanos(keepAliveTime); this.threadFactory=threadFactory; this.handler=handler; }
- 线程大小的设置,常见实现:
- coreSize == maxSize:
如上设置,随着请求的不断增加,会是这样的现象:
- 请求数 < coreSize 时,新增线程;
- 请求数 >= coreSize && 队列不满时,添加任务入队;
- 队列满时,此时因为 coreSize 和 maxSize 相等,任务会被直接拒绝。
这么写的最大目的:是让线程一下子增加到 maxSize,并且不要回收线程,防止线程回收,避免不断增加回收的损耗,一般来说业务流量都有波峰低谷,在流量低谷时,线程不会被回收;流量波峰时,maxSize 的线程可以应对波峰,不需要慢慢初始化到 maxSize 的过程。
如上设置有两个前提条件:
- allowCoreThreadTimeOut 采取默认 false,而不会主动设置成 true,allowCoreThreadTimeOut 是 false 的话,当线程空闲时,就不会回收核心线程;
- keepAliveTime 和 TimeUnit 都会设置很大,这样线程空闲的时间就很长,线程就不会轻易的被回收
大多数情况下机器的资源是比较充足,不需要担心线程空闲会浪费机器的资源,所以这种写法目前比较常见。
- maxSize 无界 + SynchronousQueue:
SynchronousQueue 其内部有堆栈和队列两种形式,默认是堆栈的形式,其内部是没有存储的容器的,放元素和拿元素是一一对应的,比如我使用 put 方法放元素,如果此时没有对应的 take 操作的话,put 操作就会阻塞,需要有线程过来执行 take 操作后,put 操作才会返回。
maxSize 无界 + SynchronousQueue 这样的组合方式优缺点都很明显:
- 优点:阻塞队列没有存储空间,只要请求到来,就必须找到一条空闲线程去处理这个请求,找不到则在线程池新开辟一条线程去执行。如果是其他的队列的话,只知道任务已经被提交成功了,但无法知道当前任务是在被消费中,还是正在队列中堆积。
- 缺点:
- 比较消耗资源,大量请求到来时,会新建大量的线程来处理请求
- 正是因为 SynchronousQueue 没有存储空间,若线程池中的线程数已经达到了 maxSize 且没有空闲线程,那么第 maxSize+1 个任务就会被reject。所以如果请求的量难以预估的话,maxSize 的大小也很难设置
- maxSize 有界 + Queue 无界:
适用场景:对实时性要求不大,但流量忽高忽低的场景。
比如设置 maxSize 为 20,Queue 选择默认构造器的 LinkedBlockingQueue,这样做的优缺点如下:
- 优点:
- 电脑 cpu 固定的情况下,每秒能同时工作的线程数是有限的,此时开很多的线程其实也是浪费,还不如把这些请求放到队列中去等待,这样可以减少线程之间的 CPU 的竞争;
- LinkedBlockingQueue 默认构造器构造出来的链表的最大容量是 Integer 的最大值,非常适合流量忽高忽低的场景,当流量高峰时,大量的请求被阻塞在队列中,让有限的线程可以慢慢消费。
- 缺点:流量高峰时,大量的请求被阻塞在队列中,对于请求的实时性难以保证,所以当对请求的实时性要求较高的场景,不能使用该组合。
- maxSize 有界 + Queue 有界:
这种组合是对「maxSize 有界 + Queue 无界」缺点的补充,把队列从无界修改成有界,只要排队的任务在要求的时间内,能够完成任务即可。需要把线程和队列的大小进行配合计算,保证大多数请求都可以在要求的时间内,有响应返回。
- 如何设置空闲线程不被回收?
把 keepAliveTime 设置成 0 时,线程使用 poll 方法在队列上进行超时阻塞时,会立马返回 null,也就是空闲线程会立马被回收。所以这种设置是错误🙅的。
可以设置 keepAliveTime 为无穷大值,并且设置 TimeUnit 为时间的大单位,比如设置 keepAliveTime 为 365,TimeUnit 为 TimeUnit.DAYS,意思是线程空闲 1 年内都不会被回收。
假设机器的内存都够大,合理设置 maxSize 后,即使线程空闲,也不希望线程被回收,所以也可以设置 keepAliveTime 为无穷大。
- 什么时候应该使用公用线程池?
是否使用公用线程池,一般有以下原则:
- 查询和写入不公用线程池:一般来说,对于互联网应用,查询量远远大于写入的量,如果查询和写入都要走线程池的话,一定不要公用线程池,也就是说查询走查询的线程池,写入走写入的线程池。如果公用的话,当查询量很大时,写入的请求可能会到队列中去排队,无法及时被处理;
- 多个写入业务场景看情况是否需要公用线程池:原则上来说,每个业务场景都独自使用自己的线程池,绝不共用,这样在业务治理、限流、熔断方面都比较容易,一旦多个业务场景公用线程池,可能就会造成业务场景之间的互相影响,每个写入业务场景独立使用自己的线程池是比较合理的;
- 多个查询业务场景是可以公用线程池的:查询的请求一般来说有几个特点:查询的场景多、rt 时间短、查询的量比较大,如果给每个查询场景都弄一个单独的线程池的话,第一个比较耗资源,第二个很难定义线程池中线程和队列的大小,比较复杂,所以多个相似的查询业务场景是可以公用线程池的。
- 如何算线程大小和队列大小?
主要从几个方面入手:
- 根据业务进行考虑,初始化线程池时,需要考虑所有业务的并发情况:
- 如果目前所有的业务同时都有很大流量,那么在对于当前业务设置线程池时,尽量把线程大小、队列大小都设置小;
- 如果所有业务基本上都不会同时有流量,那么就可以稍微设置大一点
- 根据业务的实时性要求:
- 如果实时性要求高的话,把队列设置小一点,coreSize == maxSize,并且设置 maxSize 大一点;
- 如果实时性要求低的话,就可以把队列设置大一点
举个🌰:假设现在机器上某一时间段只会运行一种业务,业务的实时性要求较高,每个请求的平均 rt(ResponseTime) 是 300ms,请求超时时间是 3000ms,机器是 4 核 CPU,内存 16G,一台机器的 qps 是 100,这时候可以模拟一下如何设置:
4 核 CPU,假设 CPU 能够跑满,每个请求的 rt 是 300ms,就是 300 ms 能执行 4 条请求,3000ms 内能执行 3000/300 * 4 = 40 条请求;
300 ms 能执行 4 条请求,实际上 4 核 CPU 的性能远远高于这个,可以拍脑袋假设增加 15条,也就是说 3000ms 内预估能够执行 55 条;
一台机器的 qps 是 100,此时计算出一台机器 3 秒内最多处理 55 条请求,所以此时如果不进行 rt 优化的话,需要加至少一台机器。
线程池可以大概这么设置:
ThreadPoolExecutorexecutor=newThreadPoolExecutor(20, 20, 365L, TimeUnit.DAYS, newLinkedBlockingQueue(35));
线程数最大为 20,队列最大为 35,这样机器差不多可以在 3000ms 内处理最大的请求 55 条,当然根据你机器的性能和实时性要求,你可以调整线程数和队列的大小占比,只要总和小于 55 即可。
以上只是很粗糙的设置,在实际项目中,还需要根据实际情况不断的观察和调整。
随着云巧资产市场的业务功能逐渐丰富和复杂,串行实现已无法有效支持业务,因此需要灵活的运用多线程进行优化,下一篇将结合资产市场核心业务——下行篇,总结多线程的有效实践和踩坑经验,敬请期待👻👻👻。
参考:
- 《“FY21技术人年度总结”手淘线程优化小结》:https://topic.atatech.org/articles/198030
- 《Java并发编程之美》 翟陆续、薛宾田著