一种检测线程阻塞的实现思路

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
可观测监控 Prometheus 版,每月50GB免费额度
性能测试 PTS,5000VUM额度
简介: 一种检测线程阻塞的实现思路

背景介绍

在过去处理过的服务故障中,有一类比较典型的场景是业务线程被阻塞(造成阻塞的原因也是多种多样),慢慢导致业务线程池中的全部线程被阻塞,最终造成无法对外提供服务(现象则是CPU、Load、内存等指标都比较低,请求接口后响应超时或者没有响应)。

典型的案例:

【全网首发】记一次socket read导致业务线程阻塞的案例分析

【全网首发】MQ-消息堆积-业务线程阻塞案例分析

问题分析

响应时间是接口监控的黄金指标之一:假设接口接收请求的时间是t1,接口处理完请求,响应的时间是t2,则接口响应时间是:t2-t1,将响应时间指标接入监控报警系统,当响应时间大于阈值的时候则进行报警;但是在线程被阻塞的情况下,由于接口一直没有返回,响应时间也就无法监控到。

阻塞的线程往往是业务线程,这些业务线程可能是:

  • 基于tomcat提供http服务的tomcat线程,线程名类似:http-nio-8080-exec-1
  • 基于RocketMQ的消息消费者线程,线程名类似:ConsumeMessageThread_1
  • 基于HSF Provider的线程,线程名类似:HSFBizProcessor-DEFAULT-12-thread-3
  • … …

如果我们能够在这些业务线程执行的必经路径上进行拦截,那么就能记录下线程开始执行的时间,同时启动定时器不断检查线程已执行的时间,当已执行时间大于设定的阈值则打印出线程栈进行报警;当线程正常返回则删除该线程记录,所以需要解决的主要是两个问题:

  • 如何拦截线程
  • 定时检查线程执行时间是否超过阈值

解决思路

通过问题分析,可以确定主要需要解决以下两个问题

检测阻塞线程

该模块主要做三件事:

  • 业务线程开始执行的时候,进行线程注册
  • 业务线程结束执行或抛异常的时候,删除线程注册信息
  • 定时检测注册的线程是否发生阻塞,如果发生阻塞则打印线程栈
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockedThreadChecker {
    protected final static Log logger = LogFactory.getLog(BlockedThreadChecker.class);
    private static volatile BlockedThreadChecker instance;
    private final static int DELAY = 10;
    private final static int PERIOD = 1000;
    private ScheduledThreadPoolExecutor executor;
    private final Map<Thread, Task> threads = new ConcurrentHashMap<>();
    private BlockedThreadChecker(){
        logger.info("init BlockedThreadChecker... ...classloader:" + this.getClass().getClassLoader() + ",parent classloader:" + this.getClass().getClassLoader().getParent());
        int coreSize = Runtime.getRuntime().availableProcessors();
        ThreadFactory threadFactory = new ThreadFactory() {
            final AtomicInteger counter = new AtomicInteger();
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "BlockThreadCheckerTimer-" + counter.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
        };
        executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                long now = System.currentTimeMillis();
                for(Map.Entry<Thread,Task> entry : threads.entrySet()){
                    long execStart = entry.getValue().startTime;
                    long dur = now - execStart;
                    if(dur >= entry.getValue().maxExecTime){
                        BlockedThreadException e = new BlockedThreadException(entry.getKey().getName() + " has been blocked " + dur + " ms");
                        e.setStackTrace(entry.getKey().getStackTrace());
                        logger.error(e.getMessage(),e);
                    }
                }
            }
        },DELAY,PERIOD, TimeUnit.MILLISECONDS);
    }
    public static BlockedThreadChecker getInstance(){
        if(instance != null){
            return instance;
        }
        synchronized (BlockedThreadChecker.class){
            if(instance != null){
                return instance;
            }
            instance = new BlockedThreadChecker();
        }
        return instance;
    }
    public void registerThread(Thread thread) {
        registerThread(thread, new Task());
    }
    public void registerThread(Thread thread,Task task) {
        threads.put(thread, task);
        logger.info("registerThread " + thread.getName());
    }
    public void unregisterThread(Thread thread) {
        threads.remove(thread);
        logger.info("unregisterThread " + thread.getName());
    }
    class Task {
        long startTime = System.currentTimeMillis();
        long maxExecTime = 10000L;
    }
}

拦截线程

方案一

服务中几种常见业务线程:

  • 基于tomcat提供http服务的tomcat线程,实现自定义Filter,在Filter中完成线程的注册和取消注册操作;
  • 基于RocketMQ的消息消费者线程,根据业务需求统一实现MessageListenerConcurrently、MessageListenerOrderly等,在统一实现类中完成线程的注册和取消注册;
  • 基于HSF Provider的线程,实现自定义Filter,在Filter中完成线程的注册和取消注册操作。

该方案实现简单,但是对于业务侵入性比较强,侵入性强意味着业务在意识不到问题的时候,没有改变的动力。

方案二

基于jvm-sandbox实现自定义module,实现思路如下:

import com.alibaba.jvm.sandbox.api.Information;
import com.alibaba.jvm.sandbox.api.LoadCompleted;
import com.alibaba.jvm.sandbox.api.Module;
import com.alibaba.jvm.sandbox.api.listener.ext.Advice;
import com.alibaba.jvm.sandbox.api.listener.ext.AdviceListener;
import com.alibaba.jvm.sandbox.api.listener.ext.EventWatchBuilder;
import com.alibaba.jvm.sandbox.api.resource.ModuleEventWatcher;
import org.kohsuke.MetaInfServices;
import sun.misc.Unsafe;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.Properties;
@MetaInfServices(Module.class)
@Information(id = "blocked-thread-module", version = "0.0.1", author = "yuji")
public class BlockedThreadModule implements Module, LoadCompleted {
    @Resource
    private ModuleEventWatcher moduleEventWatcher;
    private AdviceListener adviceListener = new AdviceListener() {
        @Override
        protected void before(Advice advice) throws Throwable {
            if (!advice.isProcessTop()) {
                return;
            }
            BlockedThreadChecker.getInstance().registerThread(Thread.currentThread());
        }
        @Override
        protected void afterReturning(Advice advice) {
            if (!advice.isProcessTop()) {
                return;
            }
            BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread());
        }
        @Override
        protected void afterThrowing(Advice advice) {
            if (!advice.isProcessTop()) {
                return;
            }
            BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread());
        }
    };
    @Override
    public void loadCompleted() {
        new EventWatchBuilder(moduleEventWatcher)
                .onClass("javax.servlet.http.HttpServlet")
                .onBehavior("service")
                .onWatch(adviceListener);
        new EventWatchBuilder(moduleEventWatcher)
                .onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently")
                .includeSubClasses()
                .onBehavior("consumeMessage")
                .onWatch(adviceListener);
        new EventWatchBuilder(moduleEventWatcher)
                .onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly")
                .includeSubClasses()
                .onBehavior("consumeMessage")
                .onWatch(adviceListener);
        new EventWatchBuilder(moduleEventWatcher)
                .onClass("com.taobao.hsf.remoting.provider.ReflectInvocationHandler")
                .includeSubClasses()
                .onBehavior("invoke")
                .onWatch(adviceListener);
    }
}

通过在应用启动参数中增加javaagent=jvm-sandbox agent的方式来使用,相比较方案一业务应用不需要改动任何代码,也不需要对已有封装的框架进行修改,缺点是jvm-sandbox需要提前部署到每个应用的机器上,会给运维带来工作量,个人认为这种方案是最稳定的。

方案三

为了避免方案二中运维工作,一种思路是以jar包的形式提供给业务方使用,业务方引入jar包就可以了

,主要面临两个问题需要解决。

如何触发jar包执行初始化逻辑

一种方式是通过spring boot starter的方式,比如arthas-spring-boot-starter;

一种是根据spring容器初始化流程,选择某个切入点,比如通过实现ApplicationListener接口,监听spring初始化完成的ApplicationEvent来实现。

如何初始化jvm-sandbox

初始化的核心逻辑如下:

//通过ByteBuddyAgent获取Instrumentation
Instrumentation inst = ByteBuddyAgent.install();
//将相应版本的sandbox-spy.jar添加到BootstrapClassLoader搜索路径中
//这一步的操作是由于sandbox-spy中包名是以java开头的,所以只能通过BootstrapClassLoader进行加载
JarFile spyJarFile = new JarFile("/目录/sandbox-spy-version.jar");
inst.appendToBootstrapClassLoaderSearch(spyJarFile);
//构造jvm-sandbox CoreFeatureString
String sandboxCoreFeatureString = String.format(";system_module=%s;mode=%s;sandbox_home=%s;provider=%s;namespace=%s;unsafe.enable=true;",systemModule, "agent", sandboxHome, provider, NAMESPACE );
CoreConfigure coreConfigure = CoreConfigure.toConfigure(sandboxCoreFeatureString,null);
CoreLoadedClassDataSource classDataSource = new DefaultCoreLoadedClassDataSource(inst,true);
ProviderManager providerManager = new DefaultProviderManager(coreConfigure);
//核心类,用户自定义的module是在这个类中完成加载和初始化的
CoreModuleManager coreModuleManager = new DefaultCoreModuleManager(coreConfigure,inst,classDataSource,providerManager);
//初始化命名空间与SpyHandler对于关系
SpyUtils.init(NAMESPACE);
//加载各种module
coreModuleManager.reset();

上面代码总体逻辑是没有问题的,需要考虑的细节是上面代码在不同类加载器体系下的兼容问题。

Tomcat

pandora可运行jar包

idea

经验总结

从目前的三种方案来说,个人比较倾向方案二。

参考资料

bytebuddy

jvm-sandbox

arthas

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
Java 数据库 Android开发
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。
209 5
|
8天前
|
Java API 调度
【JavaEE】——多线程(join阻塞,计算,引用,状态)
【JavaEE】——多线程,join,sleep引起的线程阻塞,多线程提升计算效率,如何获取线程的引用和状态
|
8月前
|
消息中间件 Linux 调度
【Linux 进程/线程状态 】深入理解Linux C++中的进程/线程状态:阻塞,休眠,僵死
【Linux 进程/线程状态 】深入理解Linux C++中的进程/线程状态:阻塞,休眠,僵死
591 0
【多线程面试题十二】、阻塞线程的方式有哪些?
线程阻塞的方式包括调用sleep()方法、阻塞式IO操作、等待同步监视器的获取、等待通知(notify),以及慎用的suspend()方法。
|
7月前
|
前端开发 JavaScript
JavaScript异步处理避免了单线程阻塞,如回调函数、Promise和async/await。
【6月更文挑战第22天】JavaScript异步处理避免了单线程阻塞,如回调函数、Promise和async/await。回调是基础,用于在操作完成后执行函数;Promise管理异步状态,支持链式调用;async/await提供同步代码外观,简化错误处理。每种技术在处理耗时任务时都起着关键作用。
62 3
|
7月前
|
Java
【技术解码】Java线程的五味人生:新建、就绪、运行、阻塞与死亡的哲学解读!
【6月更文挑战第19天】Java线程生命周期如同人生旅程,经历新建、就绪、运行、阻塞至死亡五阶段。从`new Thread()`的诞生到`start()`的蓄势待发,再到`run()`的全力以赴,线程在代码中奔跑。阻塞时面临挑战,等待资源释放,最终通过`join()`或中断结束生命。线程的每个状态转变,都是编程世界与哲思的交汇点。
51 1
|
8月前
|
设计模式 安全 Java
Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)
Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)
|
8月前
|
前端开发 JavaScript UED
由于JavaScript是单线程的,因此在处理大量异步操作时,需要确保不会阻塞UI线程
【5月更文挑战第13天】JavaScript中的Promise和async/await常用于处理游戏开发中的异步操作,如加载资源、网络请求和动画帧更新。Promise表示异步操作的结果,通过.then()和.catch()处理回调。async/await作为Promise的语法糖,使异步代码更简洁,类似同步代码。在游戏循环中,使用async/await可清晰管理资源加载和更新,但需注意避免阻塞UI线程,并妥善处理加载顺序、错误和资源管理,以保证游戏性能和稳定性。
75 3
|
8月前
|
存储 网络协议 iOS开发
connect永远阻塞线程及解决方案
connect永远阻塞线程及解决方案
117 0
|
8月前
|
监控
写一个线程来监控各线程是否发生阻塞
写一个线程来监控各线程是否发生阻塞
74 0