1. 背景
前两天应用线上机器突然罢工了,HSF调用程序的某个接口一直处于运行中状态,持续了20分钟(超时时间为60分钟),正常的响应时间在2分钟以内,但是奇怪的是业务逻辑也没有再运行,非常诡异,层层排查从怀疑ForkJoin池使用不当导致程序出现活锁,再到复现问题时结果与现象相悖直接头晕爆炸,最后终于从源码层面上面找到最终答案。
2. 分析过程
2.1 初步分析
首先查看日志,排查业务逻辑是否运行,从日志上面来看,打印出了获取到锁的日志,业务逻辑处于运行状态。
前一段时间其他项目发生过死锁问题,导致业务逻辑无法正常运行,这个时候第一想法是出现了死锁,自信的下载Arthas,输入thread -b,No most blocking thread found!
当场直接愣住,为什么没有死锁呢?没有死锁线程,我的程序在干什么呢?
2.2 深入排查
本着程序绝大多数时间比人可靠的观点,继续深入排查。
既然是HSF线程池在Wait,那么就看看HSF在Wait什么,一步一步排查,Arthas一顿操作,找到了wait的HSF线程(以下截图的当时线程的快照),在wait CompletableFuture的结果返回。
这个时候Arthas再去查看业务的线程在做什么,居然锁在了parallelStream.collect的操作,查看代码collect操作只是一个普通的并发操作序列化对象信息。
突然灵光一现,CompletableFuture 和 parallelStream 使用的是一个公共线程池ForkJoin池,是不是出现了此线程池出现了问题呢?
Arthas 查看ForkJoin池在做什么,发现所有的线程都在等待一个锁,而这个锁的持有者是正在wait collect的业务线程。
好了,大功告成,CompletableFuture 和 parallelStream使用一个线程池并发的问题,把其中一个并发去了就死锁就解除了。
2.3 问题复现
线程池线程设置为1,此时提交一个任务A,任务A的方法是给线程池提交一个任务B,然后获取任务B的返回值,程序运行后,会发现检测不到死锁,但是程序无法正常工作,此时便处于活锁状态。
package com.example.learn.thread; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class LockTest { public static ThreadPoolExecutor handleExecutor = new ThreadPoolExecutor(1, 1, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) throws InterruptedException, ExecutionException { Callable<String> taskA = new Callable<String>(){ @Override public String call() { try { System.out.println("taskA run"); Future<String> taskB = handleExecutor.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("taskB run"); return "taskB"; } }); return taskB.get(); } catch (Exception e) { e.printStackTrace(); } return "taskA"; } }; Future<?> submit = handleExecutor.submit(taskA); submit.get(); System.out.println("finish"); } }
3. 山重水复疑无路
3.1 复现问题,结果与现象相悖
解决完死锁问题后,长舒一口气,但是突然脑子里面蹦出来几个问题,始终让我觉得问题没这么简单:
- 代码版本很久没变更了,为什么这次出问题了?
- CompletableFuture 和 parallelStream 这种java自带的用法,如果并发有问题的话,所有的程序都会有这个隐藏问题。而且使用两个用法的地方穿插在无数类里面,尤其是在多人开发的情况下,如果我负责写入口方法,想要用parallelStream做并发操作,其他人提供的实现也正好用了parallelStream,那不就凉了吗?而且方法如果很复杂,涉及到几十个类的话,这种问题怎么避免呢?
不管了,直接把业务逻辑给简化一下,然后写一个程序运行看看是否会出现问题吧。
逻辑如下:
提交4个任务,Fork-join池改成2个(为什么是2个?因为1个的话CompletableFuture不会使用Fork-join的公共池),理论上来讲,Fork-join池都会被全局锁给锁住,此时获取到锁的线程用parallelStream应该获取不到Fork-join池的线程来做操作,从而导致活锁。
代码如下:
package com.example.learn.thread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; public class LockThread { private static final List<String> nameList = new ArrayList<>(); public static ThreadPoolExecutor handleExecutor = new ThreadPoolExecutor(1, 1, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); static { System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2"); nameList.add("1"); nameList.add("2"); nameList.add("3"); nameList.add("4"); } private static final Lock LOCK = new ReentrantLock(); public static void main(String[] args) throws InterruptedException, ExecutionException { List<CompletableFuture<Void>> futures = nameList.stream() .map(name -> CompletableFuture.runAsync(() -> { processJob(name); })) .collect(Collectors.toList()); try { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } catch (CompletionException e) { System.out.println("catch Error"); } System.out.println("finish"); } /** * 做一些复杂操作 * @param name */ private static void processJob(String name) { try { //其他操作 System.out.println("submit job:" + name + "Thread:" + Thread.currentThread().getName() + ",count:" + handleExecutor.getQueue().size()); handleExecutor.submit(() -> publishMessage(name)); } catch (Exception e) { } } private static void publishMessage(String name) { try { boolean acquired = false; while (!acquired) { try { acquired = LOCK.tryLock(3, TimeUnit.SECONDS); if (!acquired) { Thread.sleep(1000); } } catch (Exception e) { } } // 复杂操作 Thread.sleep(2000); List<String> jobList = findByName(name); List<String> resultList = jobList.parallelStream() .map(s -> { try { Thread.sleep(10); } catch (Exception e) { } System.out.println("Thread:" + Thread.currentThread().getName() + ",job:" + s); return s + "complete"; }) .collect(Collectors.toList()); System.out.println("Thread:" + Thread.currentThread().getName()+ ",result:" + resultList); } catch (Exception e) { } finally { System.out.println("unlock"); LOCK.unlock(); } } private static List<String> findByName(String name) { List<String> result = new ArrayList<>(); for (int i = 0; i < 5; i++) { result.add(name + "-" + i); } return result; } }
最后的结果出乎意料,线程没有出现锁,任务都顺利完成了。
任务顺序完成的时候,我的头直接爆炸了,原来的分析都是错的吗?Arthas抓住的活锁难道是假的吗?只是正好看的瞬间在wait吗?
3.2 查看监控,简化问题,找出蛛丝马迹
感谢monitor监控会有机器采样,重新观察当时的栈快照的详情,发现与Arthas看到的现象一致,而且观察wait的对象,block的时间等等,最终确定还是CompletableFuture 和 parallelStream 出现冲突,导致程序活锁。
写一个简单的程序,先占满Fork-join池,在用parallelStream 看看能不能完成,最终发现可以完成,但是也发现一些蛛丝马迹,parallelStream 只有一个线程在做事情,而且是当前线程,并不是Fork-Join池线程。
package com.example.learn.thread; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; public class ParallelStreamTest { static { System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2"); } public static void main(String[] args) { List<String> jobList = findByName("1"); boolean always = true; new Thread(() -> { List<String> resultList = jobList.parallelStream() .map(s -> { while(always) { try { Thread.sleep(1000); } catch (Exception e) { } //System.out.println("Thread:" + Thread.currentThread().getName() + ", job:" + s); } return s + "complete"; }) .collect(Collectors.toList()); System.out.println(resultList); }).start(); List<String> jobList2 = findByNameTwo("1"); new Thread(() ->{ while (true) { try { jobList2.parallelStream() .map(s -> { try { Thread.sleep(20); } catch (Exception e) { } System.out.println("Thread:" + Thread.currentThread().getName() + ", 外部循环:" + s); return s + "complete"; }) .collect(Collectors.toList()); Thread.sleep(10000); } catch (Exception e) { } } }).start(); } private static List<String> findByName(String name) { List<String> result = new ArrayList<>(); for (int i = 0; i < 5; i++) { result.add(name + "-" + i); } return result; } private static List<String> findByNameTwo(String name) { List<String> result = new ArrayList<>(); for (int i = 0; i < 1000; i++) { result.add(name + "-" + i); } return result; } }
这个时候崩溃了,如果parallelStream的用法可以保证Fork-join池就算满了,也能用当前线程执行,为什么我的业务线程还会被锁住呢?
3.3 搜索文档,询问其他人有没有遇到过类似的问题
网上都搜索不到类似问题,询问其他人也没有遇到过类似的问题,难道真的要用出最后一招了吗?看源码,debug源码,看看parallelStream到底是怎么运行的?
4. 柳岸花明又一村
4.1 找源码解析的文档和模型抽象图
截取几张重要的图片
参考以下文档:
4.2 文档只是引路人,还需要安安心心debug代码
看了很多文章讲述Fork-join池的原理,但是都没有解开我心中的问题,到底什么时候会用Fork-join的线程池呢?什么时候用本线程呢?到底会不会出现活锁呢?
初步怀疑,当前线程提交任务的时候,如果发现Fork-join线程有问题就不提交了,自己去执行?
但是经不起推敲,怎么发现线程有问题的呢?
经过Debug发现,parallelStream执行后,确实调用了Fork-join中的fork操作,然后将任务放到frok-join的队列中。
向出现死锁的线程排队队列中提交任务,然后还能完成?难道当前线程可以获取到队列的任务吗?
顺着代码排查,发现wait任务完成之前有一个奇怪的方法(“帮忙”?)
最后发现,当前线程在“帮忙”的时候,能把队列中的任务都给处理完成,直到本任务结束。
但是如果是这样的话,那么为什么活锁出现的现场,当前线程没有“帮忙”把所有任务完成呢?
4.3 真相大白
头晕眼花的时候,突然发现“帮忙”的时候拿取的是一个队列,Fork-join池最少有两个队列,为啥只帮我处理一个队列呢?检查别的地方没发现有遍历全部队列的地方,难道说当时是因为有一个任务分配给其他的死锁的队列里面了吗?
向Fork-Join池中提交任务时源码再探究
发现用当前线程提交的任务都只会分配到一个队列里面,而且“帮忙”的时候也只会帮忙这一个队列。
哎,不对,那我这个parallelStream只能用两个线程吗?通过别的文档发现,Fork-join线程池的模型与线程池存在区别,而且有一个窃取算法,可以窃取任务到本队列。
按照结果和现象来推论当时为什么会出现活锁,线程池有2个情况下,Work线程提交了两个普通任务,Hsf线程提交了两个死锁任务,但是很不巧,负责Hsf线程处理的fork-1的线程stole了一个普通任务,而且过程中hsf线程提交了两个死锁任务,导致fork-1处于无法工作的状态,这个时候fork-1队列中的普通任务无法完成,fork-2拉到死锁任务,然后Fork-join线程全部死锁。
第一步:
第二步:
第三步:
第四步:
第五步
上述分析很有道理,但是需要需要代码证明,Stole的时候并不会提前上一个全局锁,不然fork-1线程Stole的时候,直接执行的话或者一定此任务优化的话,Stole来的3+4就能算完,不会出现活锁问题。
scan是Stole的核心代码
最后发现窃取的任务和普通的任务一样,都是向队列做一个push操作,并没有上全局锁,而且fork线程做stole操作的任务,一定会放在自己的队列中。
自此真相大白,如果是上面的这种情况,发生的概率非常低,而且复现的难度也比较高,所以线上运行了很久才出现了这一次问题。
5. 总结
对于ForkJoin池的理解不够,本次问题排查一波三折,期间无数次各种怀疑,最终终于真相大白,支撑排查下来的理念就是程序绝大多数时间比人可靠的观点,并且80%的问题都可以解决。
后续的建议和修改方案是对于用到ForkJoin池相关的操作如CompletableFuture 和 parallelStream等不要做任何复杂的操作,不要调用其他类的方法,只做一些无锁的基础操作,如果需要调用其他类的方法需要使用自定义线程池。
学习一个新知识的时候,搜索文档是必要的而且有用的,但是大部分的文档都是宏观层面,并不会深入探究细节,此时需要自己深入debug代码结合文档一起学习。
理论为实践提供指导和支持,实践则是理论得到验证和应用的手段。
来源 | 阿里云开发者公众号
作者 | 信徒