背景介绍
在过去处理过的服务故障中,有一类比较典型的场景是业务线程被阻塞(造成阻塞的原因也是多种多样),慢慢导致业务线程池中的全部线程被阻塞,最终造成无法对外提供服务(现象则是CPU、Load、内存等指标都比较低,请求接口后响应超时或者没有响应)。
典型的案例:
【全网首发】记一次socket read导致业务线程阻塞的案例分析
问题分析
响应时间是接口监控的黄金指标之一:假设接口接收请求的时间是t1,接口处理完请求,响应的时间是t2,则接口响应时间是:t2-t1,将响应时间指标接入监控报警系统,当响应时间大于阈值的时候则进行报警;但是在线程被阻塞的情况下,由于接口一直没有返回,响应时间也就无法监控到。
阻塞的线程往往是业务线程,这些业务线程可能是:
- 基于tomcat提供http服务的tomcat线程,线程名类似:http-nio-8080-exec-1
- 基于RocketMQ的消息消费者线程,线程名类似:ConsumeMessageThread_1
- 基于HSF Provider的线程,线程名类似:HSFBizProcessor-DEFAULT-12-thread-3
- … …
如果我们能够在这些业务线程执行的必经路径上进行拦截,那么就能记录下线程开始执行的时间,同时启动定时器不断检查线程已执行的时间,当已执行时间大于设定的阈值则打印出线程栈进行报警;当线程正常返回则删除该线程记录,所以需要解决的主要是两个问题:
- 如何拦截线程
- 定时检查线程执行时间是否超过阈值
解决思路
通过问题分析,可以确定主要需要解决以下两个问题
检测阻塞线程
该模块主要做三件事:
- 业务线程开始执行的时候,进行线程注册
- 业务线程结束执行或抛异常的时候,删除线程注册信息
- 定时检测注册的线程是否发生阻塞,如果发生阻塞则打印线程栈
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class BlockedThreadChecker { protected final static Log logger = LogFactory.getLog(BlockedThreadChecker.class); private static volatile BlockedThreadChecker instance; private final static int DELAY = 10; private final static int PERIOD = 1000; private ScheduledThreadPoolExecutor executor; private final Map<Thread, Task> threads = new ConcurrentHashMap<>(); private BlockedThreadChecker(){ logger.info("init BlockedThreadChecker... ...classloader:" + this.getClass().getClassLoader() + ",parent classloader:" + this.getClass().getClassLoader().getParent()); int coreSize = Runtime.getRuntime().availableProcessors(); ThreadFactory threadFactory = new ThreadFactory() { final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "BlockThreadCheckerTimer-" + counter.incrementAndGet()); thread.setDaemon(true); return thread; } }; executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory); executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { long now = System.currentTimeMillis(); for(Map.Entry<Thread,Task> entry : threads.entrySet()){ long execStart = entry.getValue().startTime; long dur = now - execStart; if(dur >= entry.getValue().maxExecTime){ BlockedThreadException e = new BlockedThreadException(entry.getKey().getName() + " has been blocked " + dur + " ms"); e.setStackTrace(entry.getKey().getStackTrace()); logger.error(e.getMessage(),e); } } } },DELAY,PERIOD, TimeUnit.MILLISECONDS); } public static BlockedThreadChecker getInstance(){ if(instance != null){ return instance; } synchronized (BlockedThreadChecker.class){ if(instance != null){ return instance; } instance = new BlockedThreadChecker(); } return instance; } public void registerThread(Thread thread) { registerThread(thread, new Task()); } public void registerThread(Thread thread,Task task) { threads.put(thread, task); logger.info("registerThread " + thread.getName()); } public void unregisterThread(Thread thread) { threads.remove(thread); logger.info("unregisterThread " + thread.getName()); } class Task { long startTime = System.currentTimeMillis(); long maxExecTime = 10000L; } }
拦截线程
方案一
服务中几种常见业务线程:
- 基于tomcat提供http服务的tomcat线程,实现自定义Filter,在Filter中完成线程的注册和取消注册操作;
- 基于RocketMQ的消息消费者线程,根据业务需求统一实现MessageListenerConcurrently、MessageListenerOrderly等,在统一实现类中完成线程的注册和取消注册;
- 基于HSF Provider的线程,实现自定义Filter,在Filter中完成线程的注册和取消注册操作。
该方案实现简单,但是对于业务侵入性比较强,侵入性强意味着业务在意识不到问题的时候,没有改变的动力。
方案二
基于jvm-sandbox实现自定义module,实现思路如下:
import com.alibaba.jvm.sandbox.api.Information; import com.alibaba.jvm.sandbox.api.LoadCompleted; import com.alibaba.jvm.sandbox.api.Module; import com.alibaba.jvm.sandbox.api.listener.ext.Advice; import com.alibaba.jvm.sandbox.api.listener.ext.AdviceListener; import com.alibaba.jvm.sandbox.api.listener.ext.EventWatchBuilder; import com.alibaba.jvm.sandbox.api.resource.ModuleEventWatcher; import org.kohsuke.MetaInfServices; import sun.misc.Unsafe; import javax.annotation.Resource; import java.lang.reflect.Field; import java.util.Properties; @MetaInfServices(Module.class) @Information(id = "blocked-thread-module", version = "0.0.1", author = "yuji") public class BlockedThreadModule implements Module, LoadCompleted { @Resource private ModuleEventWatcher moduleEventWatcher; private AdviceListener adviceListener = new AdviceListener() { @Override protected void before(Advice advice) throws Throwable { if (!advice.isProcessTop()) { return; } BlockedThreadChecker.getInstance().registerThread(Thread.currentThread()); } @Override protected void afterReturning(Advice advice) { if (!advice.isProcessTop()) { return; } BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread()); } @Override protected void afterThrowing(Advice advice) { if (!advice.isProcessTop()) { return; } BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread()); } }; @Override public void loadCompleted() { new EventWatchBuilder(moduleEventWatcher) .onClass("javax.servlet.http.HttpServlet") .onBehavior("service") .onWatch(adviceListener); new EventWatchBuilder(moduleEventWatcher) .onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently") .includeSubClasses() .onBehavior("consumeMessage") .onWatch(adviceListener); new EventWatchBuilder(moduleEventWatcher) .onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly") .includeSubClasses() .onBehavior("consumeMessage") .onWatch(adviceListener); new EventWatchBuilder(moduleEventWatcher) .onClass("com.taobao.hsf.remoting.provider.ReflectInvocationHandler") .includeSubClasses() .onBehavior("invoke") .onWatch(adviceListener); } }
通过在应用启动参数中增加javaagent=jvm-sandbox agent的方式来使用,相比较方案一业务应用不需要改动任何代码,也不需要对已有封装的框架进行修改,缺点是jvm-sandbox需要提前部署到每个应用的机器上,会给运维带来工作量,个人认为这种方案是最稳定的。
方案三
为了避免方案二中运维工作,一种思路是以jar包的形式提供给业务方使用,业务方引入jar包就可以了
,主要面临两个问题需要解决。
如何触发jar包执行初始化逻辑
一种方式是通过spring boot starter的方式,比如arthas-spring-boot-starter;
一种是根据spring容器初始化流程,选择某个切入点,比如通过实现ApplicationListener接口,监听spring初始化完成的ApplicationEvent来实现。
如何初始化jvm-sandbox
初始化的核心逻辑如下:
//通过ByteBuddyAgent获取Instrumentation Instrumentation inst = ByteBuddyAgent.install(); //将相应版本的sandbox-spy.jar添加到BootstrapClassLoader搜索路径中 //这一步的操作是由于sandbox-spy中包名是以java开头的,所以只能通过BootstrapClassLoader进行加载 JarFile spyJarFile = new JarFile("/目录/sandbox-spy-version.jar"); inst.appendToBootstrapClassLoaderSearch(spyJarFile); //构造jvm-sandbox CoreFeatureString String sandboxCoreFeatureString = String.format(";system_module=%s;mode=%s;sandbox_home=%s;provider=%s;namespace=%s;unsafe.enable=true;",systemModule, "agent", sandboxHome, provider, NAMESPACE ); CoreConfigure coreConfigure = CoreConfigure.toConfigure(sandboxCoreFeatureString,null); CoreLoadedClassDataSource classDataSource = new DefaultCoreLoadedClassDataSource(inst,true); ProviderManager providerManager = new DefaultProviderManager(coreConfigure); //核心类,用户自定义的module是在这个类中完成加载和初始化的 CoreModuleManager coreModuleManager = new DefaultCoreModuleManager(coreConfigure,inst,classDataSource,providerManager); //初始化命名空间与SpyHandler对于关系 SpyUtils.init(NAMESPACE); //加载各种module coreModuleManager.reset();
上面代码总体逻辑是没有问题的,需要考虑的细节是上面代码在不同类加载器体系下的兼容问题。
Tomcat
pandora可运行jar包
idea
经验总结
从目前的三种方案来说,个人比较倾向方案二。