ForkJoinPool的commonPool相关参数配置
commonPool是ForkJoinPool内置的一个线程池对象,JDK8里有些都是使用它的。他怎么来的呢?具体源码为ForkJoinPool的静态方法:makeCommonPool
private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
参数解释 以及自定义commonPool的参数
通过代码指定,必须得在commonPool初始化之前(parallel的stream被调用之前,一般可在系统启动后设置)注入进去,否则无法生效。
通过启动参数指定无此限制,较为安全
parallelism(即配置线程池个数)
可以通过java.util.concurrent.ForkJoinPool.common.parallelism进行配置,最大值不能超过MAX_CAP,即32767.
static final int MAX_CAP = 0x7fff; //32767
如果没有指定,则默认为Runtime.getRuntime().availableProcessors() - 1.
自定义:代码指定(必须得在commonPool初始化之前注入进去,否则无法生效)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8"); // 或者启动参数指定 -Djava.util.concurrent.ForkJoinPool.common.parallelism=8
threadFactory:默认为defaultForkJoinWorkerThreadFactory,没有securityManager的话。
exceptionHandler:如果没有设置,默认为null
WorkQueue:控制是FIFO还是LIFO
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
queue capacity:队列容量
继续介绍
创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务了。
其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。
其中RecusiveTask代表有返回值的任务,
而RecusiveAction代表没有返回值的任务。
它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。
这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。
比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。
那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?
使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
这就是工作窃取模式的优点
总结
在了解了 Fork/Join Framework 的工作原理之后,相信很多使用上的注意事项就可以从原理中找到原因。例如:为什么在 ForkJoinTask 里最好不要存在 I/O 等会阻塞线程的行为?,这个各位读者可以思考思考了
还有一些延伸阅读的内容,在此仅提及一下:
1.ForkJoinPool 有一个 Async Mode ,效果是工作线程在处理本地任务时也使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近于是一个消息队列,而不是用来处理递归式的任务。
2.在需要阻塞工作线程时,可以使用 ManagedBlocker。
3.Java 1.8 新增加的 CompletableFuture 类可以实现类似于 Javascript 的 promise-chain,内部就是使用 ForkJoinPool 来实现的。
彩蛋
人们之所以总是忘记使用标准的 Java 对象是因为缺少一个足够装逼的名字(译注:类似于 Java Bean 这样的名字)。因此,在准备2000年的演讲时,Rebecca Parsons,Josh Mackenzie 和我给他们起了一个名字叫做 POJO (平淡无奇的 Java 对象)。
有些东西,有些技术如果没有一个够装逼的名字,也是很难被人们记住的。。。吼吼吼