深入理解java线程池(1)——ThreadPoolExecutor

简介: 简介:梳理线程池核心原理,探讨对ThreadPoolExecutor的理解和总结使用线程池的常用tips

1. 线程池有多重要🤔🤔🤔

线程是一个程序“媛”一定会涉及到的一个概念,但是线程的创建和切换代价都比较大的。所以,我们有没有一个好的方案能做到线程的复用呢?这就涉及到一个概念——线程池。线程池解决的核心问题就是资源管理的问题,合理的使用线程池能够带来3个很明显的好处:

  1. 降低资源消耗:通过重用已经创建的线程,来降低线程创建和销毁造成的资源消耗;
  2. 提高响应速度:在高并发场景下,任务到达时不需要等待线程创建就可以立即执行;
  3. 提高线程的可管理性:线程在系统中是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为不合理的分配线程导致资源调度失衡,降低系统的可用性,而使用线程池则可以统一管理、分配、调优和监控。

综上所述,了解线程池的原理和使用在生产中是非常重要的。本文主要探讨JDK提供的ThreadPoolExecutor类。

2. Java对多线程的支持⚙️——ThreadPoolExecutor

Java的线程池支持主要通过ThreadPoolExecutor来实现,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor。

2.1 核心原理

首先了解ThreadPoolExecutor的继承关系:

  1. 顶层接口Executor:定义 execute 方法来执行任务,入参是 Runnable,无出参
publicinterfaceExecutor {
/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/voidexecute(Runnablecommand);
}

将任务的提交和执行进行解耦,使用者无需关注如何创建线程和如何调度线程,仅需要提供Runnable,关注核心任务的逻辑,将Runnable提交到Executor,由Executor完成线程的调配和任务的执行。

  1. ExecutorServiceExecutor 的功能太弱,ExecutorService 丰富了对任务的执行和管理的功能,补充可以为一个或者一批异步任务生成Future的方法,并提供了管理线程池的方法,比如shutdown()和shutdownNow()
publicinterfaceExecutorServiceextendsExecutor {
voidshutdown();
List<Runnable>shutdownNow();
booleanisShutdown();
booleanisTerminated();
booleanawaitTermination(longtimeout, TimeUnitunit)
throwsInterruptedException;
<T>Future<T>submit(Callable<T>task);
<T>Future<T>submit(Runnabletask, Tresult);
Future<?>submit(Runnabletask);
<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks)
throwsInterruptedException;
<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks,
longtimeout, TimeUnitunit)
throwsInterruptedException;
<T>TinvokeAny(Collection<?extendsCallable<T>>tasks)
throwsInterruptedException, ExecutionException;
<T>TinvokeAny(Collection<?extendsCallable<T>>tasks,
longtimeout, TimeUnitunit)
throwsInterruptedException, ExecutionException, TimeoutException;
}
  1. AbstractExecutorService:封装了Executor很多通用功能的抽象类,实现了部分ExecutorService方法。可以串联任务的流程,保证下次的实现仅关注一个执行任务的方法即可。
  2. ThreadPoolExecutor:是实现ExecutorService接口的实现类之一(当然实现ExecutorService接口的常用类还有ScheduledThreadPoolExecutor、TimedExecutorService等),主要是维护线程的生命周期。

那么ThreadPoolExecutor是如何做到的呢?其运行原理如下:


step1:调用ThreadPoolExecutor的execute提交线程,首先检查CorePool,如果CorePool内的线程小于CorePoolSize,新创建线程执行任务。
step2:如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
step3:如果不能加入BlockingQueue,在小于MaxPoolSize的情况下创建线程执行任务。
step4:如果线程数大于等于MaxPoolSize,那么执行拒绝策略

实际上线程池的运行是分为两部分:任务管理、线程管理。任务管理相当于生产者角色,提交任务即生产消息;线程管理相当于消费者角色,线程被统一维护在线程池内,根据任务请求的情况进行线程的分配,当线程执行完任务后会继续获取新的任务执行,最终获取不到任务的时候,线程会被回收。


了解完整体的设计和核心原理,再具体看下线程池是如何管理任务和线程的?接下来从线程池的生命周期开始分析。

2.2 生命周期

线程池的运行状态,不是由用户显式设置的,而是伴随着线程池的运行,由内部来维 护。线程池内部使用一个变量维护两个值:运行状态 (runState) 和线程数量 (workerCount)。在具体实现中,线程池将运行状态 (runState)、线程数量 (workerCount)。两个关键参数的维护放在了一起,如下代码所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 这个 AtomicInteger 类型,是对线程池的运行状态和线程池中有效线程的数量 进行控制的一个字段,它同时包含两部分的信息:运行状态 (runState) 、有效线程的数量 (workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关 决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线 程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码 所示:

// 计算当前运行状态privatestaticintrunStateOf(intc)     { returnc&~CAPACITY; }
// 计算当前线程数据privatestaticintworkerCountOf(intc)  { returnc&CAPACITY; }
// 通过状态和线程数计算ctlprivatestaticintctlOf(intrs, intwc) { returnrs|wc; }


ThreadPoolExecutor 的运行状态有 5 种,分别为:

  • RUNNING:运行状态,可以接受新提交的任务,并且能够处理阻塞队列中的任务;
  • SHUTDOWN:关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中的任务;
  • STOP:停止状态,不再接受新的任务,也不处理阻塞队列中的任务,中断正在处理的任务线程;
  • TIDYING:整理状态,所有的任务都已经终止,workCount(有效线程数为0);
  • TERMINATED:终止状态,在terminated方法执行之后进入该状态。


分析完线程池的生命周期,那么在线程池中是如何实现调度呢?

2.3 线程调度

线程池的本质是对任务和线程的管理,而做到这一点的核心思想就是将任务和线程两者解耦,不让两者直接关联。前面小节有提到,线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。那么缓冲模块是线程池能够管理任务的核心部分。

从存储结构、是否有界、是否线程安全,逐个分析常用的几种类型阻塞队列:

  • ArrayBlockingQueue:是一个基于数组的阻塞队列
  • 底层存储结构:数组
  • 是否有界:有(创建该阻塞队列实例需要指定队列容量)
  • 是否线程安全:是(在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制,换言之生产者线程与消费者线程间无法同时操作)
  • LinkedBlockingQueue:是一个基于链表的阻塞队列,
  • 底层存储结构:链表
  • 是否有界:可有可无(该阻塞队列容量默认为 Integer.MAX_VALUE,即如果未显式设置队列容量时可以视为是一个无界队列;反之构建实例过程中指定队列容量,则其就是一个有界队列。)
  • 是否线程安全:是(在并发控制层面,其使用了两个ReentrantLock可重入锁来分别控制对入队、出队这两种类型的操作。使得生产者线程与消费者线程间可以同时操作提高效率。特别地对于链表这种结构而言,Java还提供了一个实现BlockingDeque接口的LinkedBlockingDeque类——其是一个基于链表的双向阻塞队列)
  • PriorityBlockingQueue:优先级队列
  • 底层存储结构:使用数组实现元素的存储、最小堆的表示(默认使用元素的自然排序,即要求元素实现Comparable接口;或者显式指定比较器Comparator。)
  • 是否有界:无
  • 是否线程安全:否(在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制。在创建该队列实例时虽然可以指定容量。但这并不是队列的最终容量,而只是该队列实例的初始容量。一旦后续过程队列容量不足,其会自动进行扩容。值得一提的是,为了保证同时只有一个线程进行扩容,其内部是通过CAS方式来实现的,而不是利用ReentrantLock可重入锁来控制。)

示例:

@Testpublicvoidtest1() {
BlockingQueue<Integer>blockingQueue=newPriorityBlockingQueue<>(2);
blockingQueue.offer(13);
blockingQueue.offer(5);
blockingQueue.offer(7);
Integersize=blockingQueue.size();
System.out.println("blockingQueue: "+blockingQueue+", size: "+size);
Integere1=blockingQueue.poll();
System.out.println("e1: "+e1);
Integere2=blockingQueue.poll();
System.out.println("e2: "+e2);
Integere3=blockingQueue.poll();
System.out.println("e3: "+e3);
}

运行结果:

  • DelayQueue:延迟队列
  • 底层存储结构:使用PriorityQueue实现元素的存储
  • 是否有界:无
  • 是否线程安全:否(使用ReentrantLock实现线程同步)

示例:

publicclassBlockingQueueTest {
privatestaticDateTimeFormatterformatter=DateTimeFormatter.ofPattern("HH:mm:ss");
@Testpublicvoidtest2() throwsException {
BlockingQueue<Cache>blockingQueue=newDelayQueue<>();
newThread(() -> {
while (true) {
try {
Cachecache=blockingQueue.take();
info("消费者: "+cache.toString());
                } catch (Exceptione) {
System.out.println("Happen Exception: "+e.getMessage());
                }
            }
        }).start();
LongtimeStamp=System.currentTimeMillis();
Cachecache1=newCache("name", "Aaron", timeStamp+15*1000);
blockingQueue.put(cache1);
Cachecache2=newCache("age", "18", timeStamp+27*1000);
blockingQueue.put(cache2);
Cachecache3=newCache("country", "China", timeStamp+7*1000);
blockingQueue.put(cache3);
Thread.sleep(120*1000);
    }
/*** 打印信息*/privatestaticvoidinfo(Stringmsg) {
Stringtime=formatter.format(LocalTime.now());
Stringthread=Thread.currentThread().getName();
Stringlog="["+time+"] "+msg;
System.out.println(log);
    }
@AllArgsConstructor@DataprivatestaticclassCacheimplementsDelayed {
// 缓存 KeyprivateStringkey;
// 缓存 ValueprivateStringvalue;
// 缓存到期时间privateLongexpire;
/*** 计算当前延迟时间* @param unit* @return*/@OverridepubliclonggetDelay(TimeUnitunit) {
// 缓存有效的剩余毫秒数longdelta=expire-System.currentTimeMillis();
returnunit.convert(delta, TimeUnit.MILLISECONDS);
        }
/*** 定义比较规则, 延迟时间按从小到大进行排序* @param o* @return*/@OverridepublicintcompareTo(Delayedo) {
Cacheother= (Cache) o;
returnthis.getExpire().compareTo(other.getExpire());
        }
@OverridepublicStringtoString() {
Datetime=newDate(expire);
SimpleDateFormatformatter=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
StringtimeStr=formatter.format(time);
return"Cache, key: "+key+", expire: "+timeStr;
        }
    }
}

运行结果:

  • SynchronousQueue:同步队列(该队列没有容量无法存储元素,故生产者添加的数据会直接被消费者获取并且立刻消费。所以当生产者线程添加数据时,如果此时恰好有一个消费者已经准备好获取队头元素了,则会添加成功;否则要么添加失败返回false要么被阻塞。)Executors.newCachedThreadPool()创建的线程池实例,其内部任务队列使用的就是SynchronousQueue,故offer方法添加任务到队列失败后则会开启新的线程来进行处理。

示例:

@Testpublicvoidtest3() {
BlockingQueue<Integer>blockingQueue=newSynchronousQueue<>();
Booleanb1=blockingQueue.offer(237);
info("生产者 b1: "+b1);
// 消费者线程newThread( ()->{
try{
Integere=blockingQueue.take();
info("消费者:"+e);
        } catch (Exceptione) {
info("Happen Exception: "+e.getMessage());
        }
    } ).start();
// 确保消费者线程已经准备完毕try { Thread.sleep(2000); } catch (Exceptione) {}
Booleanb2=blockingQueue.offer(996);
info("生产者 b2: "+b2);
try { Thread.sleep(120*1000); } catch (Exceptione) {}
}

测试结果如下,符合预期。生产者第一次添加元素结果失败,原因很简单。因为同步队列没有存储元素的能力,故如果没有消费者直接取走,则生产者即会添加失败;第二次添加时,消费者线程已经在阻塞等待了,故添加成功:

下面利用阻塞的put方法来添加元素,示例代码如下所示:

@Testpublicvoidtest4() {
BlockingQueue<Integer>blockingQueue=newSynchronousQueue<>();
// 生产者线程newThread(() -> {
try {
info("生产者: Start");
while (true) {
Integernum= (int) (Math.random() *100+1);
info("生产者: put "+num);
blockingQueue.put(num);
                }
            } catch (Exceptione) {
info("Happen Exception: "+e.getMessage());
            }
        }).start();
// 消费者线程newThread(() -> {
try {
info("消费者: Start");
while (true) {
try {
Thread.sleep(5000);
                    } catch (Exceptione) {
                    }
Integere=blockingQueue.take();
info("消费者: "+e);
                }
            } catch (Exceptione) {
info("Happen Exception: "+e.getMessage());
            }
        }).start();
try { Thread.sleep(120*1000); } catch (Exceptione) {}
    }

3. 小结

本片仅从线程池的源码出发,分析其原理,具体使用过程中整理了几条使用TIPS如下:

使用TIPS:

1.shutdown()和shutdownNow()

shutdown做了几件事:

  • 检查是否能操作目标线程
  • 将线程池状态转为SHUTDOWN
  • 中断所有空闲线程

 

publicvoidshutdown() {
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {
mainLock.unlock();
        }
tryTerminate();
 }

什么是空闲线程?interruptIdleWorkers的核心逻辑如下:

privatevoidinterruptIdleWorkers(booleanonlyOne) {
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
for (Workerw : workers) {
Threadt=w.thread;
if (!t.isInterrupted() &&w.tryLock()) {
try {
t.interrupt();
                    } catch (SecurityExceptionignore) {
                    } finally {
w.unlock();
                    }
                }
if (onlyOne)
break;
            }
        } finally {
mainLock.unlock();
        }
}

这里主要是为了中断worker,但是中断之前需要先获取锁,这就意味着正在运行的Worker不能中断。但是上面的代码有w.tryLock(),那么获取不到锁就不会中断,shutdown的Interrupt只是对所有的空闲Worker(正在从workQueue中取Task,此时Worker没有加锁)发送中断信号。

当调用ShutDown方法时,首先设置了线程池的状态为ShutDown,此时1阶段的worker进入到状态判断时会返回null,此时Worker退出。

因为getTask的时候是不加锁的,所以在shutdown时可以调用worker.Interrupt.此时会中断退出,Loop到状态判断时,同时workQueue为empty。那么抛出中断异常,导致重新Loop,在检测线程池状态时,Worker退出。如果workQueue不为null就不会退出,此处有些疑问,因为没有看见中断标志位清除的逻辑,那么这里就会不停的循环直到workQueue为Empty退出。

这里也能看出来shutdown()只是清除一些空闲Worker,并且拒绝新Task加入,对于workQueue中的线程还是继续处理的。
对于shutdown中获取mainLock而addWorker中也做了mainLock的获取,这么做主要是因为Works是HashSet类型的,是线程不安全的,可以看到在addWorker后面也是对线程池状态做了判断,将Worker添加和中断逻辑分离开。

再看shutdownNow:

publicList<Runnable>shutdownNow() {
List<Runnable>tasks;
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks=drainQueue();
        } finally {
mainLock.unlock();
        }
tryTerminate();
returntasks;
}

shutdownNow和shutdown代码类似,但是实现却很不相同。首先是设置线程池状态为STOP,前面的代码可以看到,是对SHUTDOWN有一些额外的判断逻辑,但是对于>=STOP,基本都是reject,STOP也是比SHUTDOWN更加严格的一种状态。此时不会有新Worker加入,所有刚执行完一个线程后去GetTask的Worker都会退出。
之后调用interruptWorkers

 

privatevoidinterruptWorkers() {
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
for (Workerw : workers)
w.interruptIfStarted();
        } finally {
mainLock.unlock();
        }
 }

这里可以看出来,此方法目的是中断所有的Worker,而不是像shutdown中那样只中断空闲线程。这样体现了STOP的特点,中断所有线程,同时workQueue中的Task也不会执行了。

所以接下来drainQueue:

 

privateList<Runnable>drainQueue() {
BlockingQueue<Runnable>q=workQueue;
ArrayList<Runnable>taskList=newArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnabler : q.toArray(newRunnable[0])) {
if (q.remove(r))
taskList.add(r);
            }
        }
returntaskList;
 }

获取所有没有执行的Task,并且返回。
这也体现了STOP的特点:拒绝所有新Task的加入,同时中断所有线程,WorkerQueue中没有执行的线程全部抛弃。所以此时Pool是空的,WorkerQueue也是空的。

2.work和Task

Worker是当前线程池中的线程,而task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。

3.maximumPoolSize和corePoolSize

maximumPoolSize为线程池最大容量,也就是说线程池最多能起多少Worker。corePoolSize是核心线程池的大小,当corePoolSize满了时,同时workQueue full(ArrayBolckQueue是可能满的) 那么此时允许新建Worker去处理workQueue中的Task,但是不能超过maximumPoolSize。超过corePoolSize之外的线程会在空闲超时后终止。

4.Keep-alive times 参数:

作用: 如果当前线程池中有超过 coreSize 的线程,并且线程空闲的时间超过 keepAliveTime,当前线程就会被回收,这样可以避免线程没有被使用时的资源浪费;

通过 setKeepAliveTime 方法可以动态的设置 keepAliveTime 的值;

如果设置 allowCoreThreadTimeOut 为 ture 的话,core thread 空闲时间超过 keepAliveTime 的话,也会被回收

5.拒绝策略:

在 Executor 已经关闭或对最大线程和最大队列都使用饱和时,可以使用 RejectedExecutionHandler 类进行异常捕捉。有如下四种处理策略:

  • AbortPolicy(默认):抛出异常
  • CallerRunsPolicy:不使用线程池,主线程来执行
  • DiscardPolicy:直接丢弃任务
  • DiscardOldestPolicy:丢弃队列中最老任务

6.ThreadPoolExecutor初始化参数设置思路:

publicThreadPoolExecutor(intcorePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnitunit,
BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,
RejectedExecutionHandlerhandler) {
if (corePoolSize<0||maximumPoolSize<=0||maximumPoolSize<corePoolSize||keepAliveTime<0)
thrownewIllegalArgumentException();
if (workQueue==null||threadFactory==null||handler==null)
thrownewNullPointerException();
this.acc=System.getSecurityManager() ==null?null :
AccessController.getContext();
this.corePoolSize=corePoolSize;
this.maximumPoolSize=maximumPoolSize;
this.workQueue=workQueue;
this.keepAliveTime=unit.toNanos(keepAliveTime);
this.threadFactory=threadFactory;
this.handler=handler;
    }
  • 线程大小的设置,常见实现:
  • coreSize == maxSize:

如上设置,随着请求的不断增加,会是这样的现象:

  • 请求数 < coreSize 时,新增线程;
  • 请求数 >= coreSize && 队列不满时,添加任务入队;
  • 队列满时,此时因为 coreSize 和 maxSize 相等,任务会被直接拒绝。

这么写的最大目的:是让线程一下子增加到 maxSize,并且不要回收线程,防止线程回收,避免不断增加回收的损耗,一般来说业务流量都有波峰低谷,在流量低谷时,线程不会被回收;流量波峰时,maxSize 的线程可以应对波峰,不需要慢慢初始化到 maxSize 的过程。

如上设置有两个前提条件:

  1. allowCoreThreadTimeOut 采取默认 false,而不会主动设置成 true,allowCoreThreadTimeOut 是 false 的话,当线程空闲时,就不会回收核心线程;
  2. keepAliveTime 和 TimeUnit 都会设置很大,这样线程空闲的时间就很长,线程就不会轻易的被回收

大多数情况下机器的资源是比较充足,不需要担心线程空闲会浪费机器的资源,所以这种写法目前比较常见。

  • maxSize 无界 + SynchronousQueue:

SynchronousQueue 其内部有堆栈和队列两种形式,默认是堆栈的形式,其内部是没有存储的容器的,放元素和拿元素是一一对应的,比如我使用 put 方法放元素,如果此时没有对应的 take 操作的话,put 操作就会阻塞,需要有线程过来执行 take 操作后,put 操作才会返回。

maxSize 无界 + SynchronousQueue 这样的组合方式优缺点都很明显:

  • 优点:阻塞队列没有存储空间,只要请求到来,就必须找到一条空闲线程去处理这个请求,找不到则在线程池新开辟一条线程去执行。如果是其他的队列的话,只知道任务已经被提交成功了,但无法知道当前任务是在被消费中,还是正在队列中堆积。
  • 缺点:
  1. 比较消耗资源,大量请求到来时,会新建大量的线程来处理请求
  2. 正是因为 SynchronousQueue 没有存储空间,若线程池中的线程数已经达到了 maxSize 且没有空闲线程,那么第 maxSize+1 个任务就会被reject。所以如果请求的量难以预估的话,maxSize 的大小也很难设置
  • maxSize 有界 + Queue 无界:

适用场景:对实时性要求不大,但流量忽高忽低的场景。

比如设置 maxSize 为 20,Queue 选择默认构造器的 LinkedBlockingQueue,这样做的优缺点如下:

  • 优点:
  1. 电脑 cpu 固定的情况下,每秒能同时工作的线程数是有限的,此时开很多的线程其实也是浪费,还不如把这些请求放到队列中去等待,这样可以减少线程之间的 CPU 的竞争;
  2. LinkedBlockingQueue 默认构造器构造出来的链表的最大容量是 Integer 的最大值,非常适合流量忽高忽低的场景,当流量高峰时,大量的请求被阻塞在队列中,让有限的线程可以慢慢消费。
  • 缺点:流量高峰时,大量的请求被阻塞在队列中,对于请求的实时性难以保证,所以当对请求的实时性要求较高的场景,不能使用该组合。
  • maxSize 有界 + Queue 有界:

这种组合是对「maxSize 有界 + Queue 无界」缺点的补充,把队列从无界修改成有界,只要排队的任务在要求的时间内,能够完成任务即可。需要把线程和队列的大小进行配合计算,保证大多数请求都可以在要求的时间内,有响应返回。


  • 如何设置空闲线程不被回收?

keepAliveTime 设置成 0 时,线程使用 poll 方法在队列上进行超时阻塞时,会立马返回 null,也就是空闲线程会立马被回收。所以这种设置是错误🙅的。

可以设置 keepAliveTime 为无穷大值,并且设置 TimeUnit 为时间的大单位,比如设置 keepAliveTime 为 365,TimeUnit 为 TimeUnit.DAYS,意思是线程空闲 1 年内都不会被回收。

假设机器的内存都够大,合理设置 maxSize 后,即使线程空闲,也不希望线程被回收,所以也可以设置 keepAliveTime 为无穷大。


  • 什么时候应该使用公用线程池?

是否使用公用线程池,一般有以下原则:

  1. 查询和写入不公用线程池:一般来说,对于互联网应用,查询量远远大于写入的量,如果查询和写入都要走线程池的话,一定不要公用线程池,也就是说查询走查询的线程池,写入走写入的线程池。如果公用的话,当查询量很大时,写入的请求可能会到队列中去排队,无法及时被处理;
  2. 多个写入业务场景看情况是否需要公用线程池:原则上来说,每个业务场景都独自使用自己的线程池,绝不共用,这样在业务治理、限流、熔断方面都比较容易,一旦多个业务场景公用线程池,可能就会造成业务场景之间的互相影响,每个写入业务场景独立使用自己的线程池是比较合理的;
  3. 多个查询业务场景是可以公用线程池的:查询的请求一般来说有几个特点:查询的场景多、rt 时间短、查询的量比较大,如果给每个查询场景都弄一个单独的线程池的话,第一个比较耗资源,第二个很难定义线程池中线程和队列的大小,比较复杂,所以多个相似的查询业务场景是可以公用线程池的。

  • 如何算线程大小和队列大小?

主要从几个方面入手:

  • 根据业务进行考虑,初始化线程池时,需要考虑所有业务的并发情况:
  • 如果目前所有的业务同时都有很大流量,那么在对于当前业务设置线程池时,尽量把线程大小、队列大小都设置小;
  • 如果所有业务基本上都不会同时有流量,那么就可以稍微设置大一点
  • 根据业务的实时性要求:
  • 如果实时性要求高的话,把队列设置小一点,coreSize == maxSize,并且设置 maxSize 大一点;
  • 如果实时性要求低的话,就可以把队列设置大一点


举个🌰:假设现在机器上某一时间段只会运行一种业务,业务的实时性要求较高,每个请求的平均 rt(ResponseTime) 是 300ms,请求超时时间是 3000ms,机器是 4 核 CPU,内存 16G,一台机器的 qps 是 100,这时候可以模拟一下如何设置:

4 核 CPU,假设 CPU 能够跑满,每个请求的 rt 是 300ms,就是 300 ms 能执行 4 条请求,3000ms 内能执行 3000/300 * 4 = 40 条请求;

300 ms 能执行 4 条请求,实际上 4 核 CPU 的性能远远高于这个,可以拍脑袋假设增加 15条,也就是说 3000ms 内预估能够执行 55 条;

一台机器的 qps 是 100,此时计算出一台机器 3 秒内最多处理 55 条请求,所以此时如果不进行 rt 优化的话,需要加至少一台机器。

线程池可以大概这么设置:

ThreadPoolExecutorexecutor=newThreadPoolExecutor(20, 20, 365L, TimeUnit.DAYS,
newLinkedBlockingQueue(35));

线程数最大为 20,队列最大为 35,这样机器差不多可以在 3000ms 内处理最大的请求 55 条,当然根据你机器的性能和实时性要求,你可以调整线程数和队列的大小占比,只要总和小于 55 即可。

以上只是很粗糙的设置,在实际项目中,还需要根据实际情况不断的观察和调整。

随着云巧资产市场的业务功能逐渐丰富和复杂,串行实现已无法有效支持业务,因此需要灵活的运用多线程进行优化,下一篇将结合资产市场核心业务——下行篇,总结多线程的有效实践和踩坑经验,敬请期待👻👻👻。

参考:

相关文章
|
11天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
3月前
|
Java 调度 数据库
Java并发编程:深入理解线程池
在Java并发编程的海洋中,线程池是一艘强大的船,它不仅提高了性能,还简化了代码结构。本文将带你潜入线程池的深海,探索其核心组件、工作原理及如何高效利用线程池来优化你的并发应用。
|
3月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
115 1
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
3月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
22天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
100 38
|
22天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
60 4
|
22天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
93 2
|
25天前
|
缓存 监控 Java
java中线程池的使用
java中线程池的使用
|
2月前
|
Java 调度 开发者
Java并发编程:深入理解线程池
在Java的世界中,线程池是提升应用性能、实现高效并发处理的关键工具。本文将深入浅出地介绍线程池的核心概念、工作原理以及如何在实际应用中有效利用线程池来优化资源管理和任务调度。通过本文的学习,读者能够掌握线程池的基本使用技巧,并理解其背后的设计哲学。