我的程序突然罢工了|深入探究HSF调用异常,从死锁到活锁的全面分析与解决

简介: 本文详细记录了作者在处理HSF调用异常问题的过程中,从初步怀疑死锁到最终发现并解决活锁问题的全过程。

1. 背景


前两天应用线上机器突然罢工了,HSF调用程序的某个接口一直处于运行中状态,持续了20分钟(超时时间为60分钟),正常的响应时间在2分钟以内,但是奇怪的是业务逻辑也没有再运行,非常诡异,层层排查从怀疑ForkJoin池使用不当导致程序出现活锁,再到复现问题时结果与现象相悖直接头晕爆炸,最后终于从源码层面上面找到最终答案。



2. 分析过程


2.1 初步分析


首先查看日志,排查业务逻辑是否运行,从日志上面来看,打印出了获取到锁的日志,业务逻辑处于运行状态。


前一段时间其他项目发生过死锁问题,导致业务逻辑无法正常运行,这个时候第一想法是出现了死锁,自信的下载Arthas,输入thread -b,No most blocking thread found!

image.png

当场直接愣住,为什么没有死锁呢?没有死锁线程,我的程序在干什么呢?

image.png


2.2 深入排查


本着程序绝大多数时间比人可靠的观点,继续深入排查。


既然是HSF线程池在Wait,那么就看看HSF在Wait什么,一步一步排查,Arthas一顿操作,找到了wait的HSF线程(以下截图的当时线程的快照),在wait CompletableFuture的结果返回。

 image.png

这个时候Arthas再去查看业务的线程在做什么,居然锁在了parallelStream.collect的操作,查看代码collect操作只是一个普通的并发操作序列化对象信息。

image.png

突然灵光一现,CompletableFuture 和 parallelStream 使用的是一个公共线程池ForkJoin池,是不是出现了此线程池出现了问题呢?


Arthas 查看ForkJoin池在做什么,发现所有的线程都在等待一个锁,而这个锁的持有者是正在wait collect的业务线程。

image.png 

好了,大功告成,CompletableFuture 和 parallelStream使用一个线程池并发的问题,把其中一个并发去了就死锁就解除了。


2.3 问题复现


线程池线程设置为1,此时提交一个任务A,任务A的方法是给线程池提交一个任务B,然后获取任务B的返回值,程序运行后,会发现检测不到死锁,但是程序无法正常工作,此时便处于活锁状态。


package com.example.learn.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LockTest {
    public static ThreadPoolExecutor handleExecutor = new ThreadPoolExecutor(1, 1,
        5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<String> taskA = new Callable<String>(){
            @Override
            public String call() {
                try {
                    System.out.println("taskA run");

                    Future<String> taskB = handleExecutor.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            System.out.println("taskB run");
                            return "taskB";
                        }
                    });

                    return taskB.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return "taskA";
            }
        };

        Future<?> submit = handleExecutor.submit(taskA);
        submit.get();
        System.out.println("finish");
    }
}

3. 山重水复疑无路


3.1 复现问题,结果与现象相悖


解决完死锁问题后,长舒一口气,但是突然脑子里面蹦出来几个问题,始终让我觉得问题没这么简单:

  • 代码版本很久没变更了,为什么这次出问题了?
  • CompletableFuture 和 parallelStream 这种java自带的用法,如果并发有问题的话,所有的程序都会有这个隐藏问题。而且使用两个用法的地方穿插在无数类里面,尤其是在多人开发的情况下,如果我负责写入口方法,想要用parallelStream做并发操作,其他人提供的实现也正好用了parallelStream,那不就凉了吗?而且方法如果很复杂,涉及到几十个类的话,这种问题怎么避免呢?

不管了,直接把业务逻辑给简化一下,然后写一个程序运行看看是否会出现问题吧。

逻辑如下:


image.png


提交4个任务,Fork-join池改成2个(为什么是2个?因为1个的话CompletableFuture不会使用Fork-join的公共池),理论上来讲,Fork-join池都会被全局锁给锁住,此时获取到锁的线程用parallelStream应该获取不到Fork-join池的线程来做操作,从而导致活锁。

代码如下:


package com.example.learn.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

public class LockThread {
    private static final List<String> nameList =  new ArrayList<>();

    public static ThreadPoolExecutor handleExecutor = new ThreadPoolExecutor(1, 1,
        5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy());

    static {
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
        nameList.add("1");
        nameList.add("2");
        nameList.add("3");
        nameList.add("4");
    }

    private static final Lock LOCK = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<CompletableFuture<Void>> futures = nameList.stream()
            .map(name -> CompletableFuture.runAsync(() -> {
                processJob(name);
            }))
            .collect(Collectors.toList());

        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        } catch (CompletionException e) {
            System.out.println("catch Error");
        }

        System.out.println("finish");
    }

    /**
     * 做一些复杂操作
     * @param name
     */
    private static void processJob(String name) {
        try {
            //其他操作
            System.out.println("submit job:" + name + "Thread:" + Thread.currentThread().getName() + ",count:" +
                handleExecutor.getQueue().size());
            handleExecutor.submit(() -> publishMessage(name));
        } catch (Exception e) {

        }

    }

    private static void publishMessage(String name) {
        try {
            boolean acquired = false;
            while (!acquired) {
                try {
                    acquired = LOCK.tryLock(3, TimeUnit.SECONDS);
                    if (!acquired) {
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {

                }
            }

            // 复杂操作
            Thread.sleep(2000);

            List<String> jobList = findByName(name);
            List<String> resultList = jobList.parallelStream()
                .map(s -> {
                    try {
                        Thread.sleep(10);
                    } catch (Exception e) {

                    }
                    System.out.println("Thread:" + Thread.currentThread().getName() + ",job:" + s);
                    return s + "complete";
                })
                .collect(Collectors.toList());
            System.out.println("Thread:" + Thread.currentThread().getName()+ ",result:" + resultList);
        } catch (Exception e) {

        } finally {
            System.out.println("unlock");
            LOCK.unlock();
        }
    }

    private static List<String> findByName(String name) {
        List<String> result = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            result.add(name + "-" + i);
        }
        return result;
    }
}

最后的结果出乎意料,线程没有出现锁,任务都顺利完成了。


任务顺序完成的时候,我的头直接爆炸了,原来的分析都是错的吗?Arthas抓住的活锁难道是假的吗?只是正好看的瞬间在wait吗?


3.2 查看监控,简化问题,找出蛛丝马迹


感谢monitor监控会有机器采样,重新观察当时的栈快照的详情,发现与Arthas看到的现象一致,而且观察wait的对象,block的时间等等,最终确定还是CompletableFuture 和 parallelStream 出现冲突,导致程序活锁。



写一个简单的程序,先占满Fork-join池,在用parallelStream 看看能不能完成,最终发现可以完成,但是也发现一些蛛丝马迹,parallelStream 只有一个线程在做事情,而且是当前线程,并不是Fork-Join池线程。


package com.example.learn.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelStreamTest {
    static {
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
    }
    public static void main(String[] args) {
        List<String> jobList = findByName("1");
        boolean always = true;

        new Thread(() -> {
            List<String> resultList = jobList.parallelStream()
                .map(s -> {
                    while(always) {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {

                        }
                        //System.out.println("Thread:" + Thread.currentThread().getName() + ", job:" + s);
                    }

                    return s + "complete";
                })
                .collect(Collectors.toList());
            System.out.println(resultList);
        }).start();


        List<String> jobList2 = findByNameTwo("1");
        new Thread(() ->{
            while (true) {
                try {
                    jobList2.parallelStream()
                        .map(s -> {
                            try {
                                Thread.sleep(20);
                            } catch (Exception e) {

                            }
                            System.out.println("Thread:" + Thread.currentThread().getName() + ", 外部循环:" + s);
                            return s + "complete";
                        })
                        .collect(Collectors.toList());
                    Thread.sleep(10000);
                } catch (Exception e) {

                }
            }
        }).start();
    }

    private static List<String> findByName(String name) {
        List<String> result = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            result.add(name + "-" + i);
        }
        return result;
    }

    private static List<String> findByNameTwo(String name) {
        List<String> result = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            result.add(name + "-" + i);
        }
        return result;
    }
}


这个时候崩溃了,如果parallelStream的用法可以保证Fork-join池就算满了,也能用当前线程执行,为什么我的业务线程还会被锁住呢?


3.3 搜索文档,询问其他人有没有遇到过类似的问题


网上都搜索不到类似问题,询问其他人也没有遇到过类似的问题,难道真的要用出最后一招了吗?看源码,debug源码,看看parallelStream到底是怎么运行的?



4. 柳岸花明又一村

4.1 找源码解析的文档和模型抽象图


截取几张重要的图片



image.png


image.png


参考以下文档:

参考一

参考二

4.2 文档只是引路人,还需要安安心心debug代码


看了很多文章讲述Fork-join池的原理,但是都没有解开我心中的问题,到底什么时候会用Fork-join的线程池呢?什么时候用本线程呢?到底会不会出现活锁呢?


初步怀疑,当前线程提交任务的时候,如果发现Fork-join线程有问题就不提交了,自己去执行?


但是经不起推敲,怎么发现线程有问题的呢?



经过Debug发现,parallelStream执行后,确实调用了Fork-join中的fork操作,然后将任务放到frok-join的队列中。


image.png


image.png

向出现死锁的线程排队队列中提交任务,然后还能完成?难道当前线程可以获取到队列的任务吗?

顺着代码排查,发现wait任务完成之前有一个奇怪的方法(“帮忙”?)


image.png

最后发现,当前线程在“帮忙”的时候,能把队列中的任务都给处理完成,直到本任务结束。


image.png

但是如果是这样的话,那么为什么活锁出现的现场,当前线程没有“帮忙”把所有任务完成呢?


4.3 真相大白


头晕眼花的时候,突然发现“帮忙”的时候拿取的是一个队列,Fork-join池最少有两个队列,为啥只帮我处理一个队列呢?检查别的地方没发现有遍历全部队列的地方,难道说当时是因为有一个任务分配给其他的死锁的队列里面了吗?


向Fork-Join池中提交任务时源码再探究


发现用当前线程提交的任务都只会分配到一个队列里面,而且“帮忙”的时候也只会帮忙这一个队列。


image.png


image.png


哎,不对,那我这个parallelStream只能用两个线程吗?通过别的文档发现,Fork-join线程池的模型与线程池存在区别,而且有一个窃取算法,可以窃取任务到本队列。


image.png

按照结果和现象来推论当时为什么会出现活锁,线程池有2个情况下,Work线程提交了两个普通任务,Hsf线程提交了两个死锁任务,但是很不巧,负责Hsf线程处理的fork-1的线程stole了一个普通任务,而且过程中hsf线程提交了两个死锁任务,导致fork-1处于无法工作的状态,这个时候fork-1队列中的普通任务无法完成,fork-2拉到死锁任务,然后Fork-join线程全部死锁。


第一步:

image.png


第二步:

image.png

第三步:

image.png

第四步:

image.png

第五步

image.png

上述分析很有道理,但是需要需要代码证明,Stole的时候并不会提前上一个全局锁,不然fork-1线程Stole的时候,直接执行的话或者一定此任务优化的话,Stole来的3+4就能算完,不会出现活锁问题。


scan是Stole的核心代码


image.png


最后发现窃取的任务和普通的任务一样,都是向队列做一个push操作,并没有上全局锁,而且fork线程做stole操作的任务,一定会放在自己的队列中。



image.png


自此真相大白,如果是上面的这种情况,发生的概率非常低,而且复现的难度也比较高,所以线上运行了很久才出现了这一次问题。



5. 总结


对于ForkJoin池的理解不够,本次问题排查一波三折,期间无数次各种怀疑,最终终于真相大白,支撑排查下来的理念就是程序绝大多数时间比人可靠的观点,并且80%的问题都可以解决。


后续的建议和修改方案是对于用到ForkJoin池相关的操作如CompletableFuture 和 parallelStream等不要做任何复杂的操作,不要调用其他类的方法,只做一些无锁的基础操作,如果需要调用其他类的方法需要使用自定义线程池。


学习一个新知识的时候,搜索文档是必要的而且有用的,但是大部分的文档都是宏观层面,并不会深入探究细节,此时需要自己深入debug代码结合文档一起学习。

理论为实践提供指导和支持,实践则是理论得到验证和应用的手段。






来源  |  阿里云开发者公众号
作者  
|  信徒


作者介绍
目录

相关实验场景