背景
分布式项目中,存在多服务调用时,通常都会使用trace id 将多个RPC 调用进行串联起来,方便对整个调用链路进行串联,一旦出现问题方便排查是哪个环节的问题。
新进入的公司发现已经存在这类做法儿,服务间调用时通过http 进行通信的,在实际排查问题时,回出现trace id 为空的情况,这导致在不熟悉业务的情况下,很难讲调用链捋顺,想要清楚的了解调用流程需要花费较多的时间研读代码,费时费力。仔细查看后,单个项目里存在异步servlet包装到线程池里执行一些耗时的异步调用的方法。
该项目是将trace id 存放在了header 里,这样就导致异步线程是无法继承存放在header 里的trace id 的。
traceid存储代码如下:
filterName="logTraceFilter", urlPatterns="/*", asyncSupported=true) (publicclassLogTraceFilterimplementsFilter { publicvoidinit(FilterConfigfilterConfig) throwsServletException { } publicvoiddoFilter(ServletRequestservletRequest, ServletResponseservletResponse, FilterChainfilterChain) throwsIOException, ServletException { HttpServletRequestrequest= (HttpServletRequest) servletRequest; StringtraceId=RequestCommonUtils.getRequetHeader("trace_id"); if (StringUtils.isBlank(traceId)) { StringnewTraceId=UUID.randomUUID().toString(); HeaderMapRequestWrapperrequestWrapper=newHeaderMapRequestWrapper(request); requestWrapper.addHeader("trace_id", newTraceId); LogTraceUtils.setTraceId(newTraceId); LogBackUtils.info("logTraceFilter.doFilter 使用新traceId,newTraceId="+newTraceId); filterChain.doFilter(requestWrapper, servletResponse); } else { LogBackUtils.info("logTraceFilter.doFilter 使用已有traceId,traceId="+traceId); LogTraceUtils.setTraceId(traceId); filterChain.doFilter(request, servletResponse); } } publicvoiddestroy() { } }
日志打印代码如下:
/*** 修改请求头工具*/publicclassHeaderMapRequestWrapperextendsHttpServletRequestWrapper { privateMap<String, String>headerMap=newHashMap<>(); /*** construct a wrapper for this request** @param request*/publicHeaderMapRequestWrapper(HttpServletRequestrequest) { super(request); } /*** add a header with given name and value** @param name* @param value*/publicvoidaddHeader(Stringname, Stringvalue) { headerMap.put(name, value); } publicStringgetHeader(Stringname) { StringheaderValue=super.getHeader(name); if (headerMap.containsKey(name)) { headerValue=headerMap.get(name); } returnheaderValue; } /*** get the Header names*/publicEnumeration<String>getHeaderNames() { List<String>names=Collections.list(super.getHeaderNames()); for (Stringname : headerMap.keySet()) { names.add(name); } returnCollections.enumeration(names); } publicEnumeration<String>getHeaders(Stringname) { List<String>values=Collections.list(super.getHeaders(name)); if (headerMap.containsKey(name)) { values=Arrays.asList(headerMap.get(name)); } returnCollections.enumeration(values); } }
publicabstractclassAbstractLogimplementsCloneable { ( name="log_version" ) privateStringlogVersion; ( name="log_time" ) privateStringlogTime; ( name="log_type" ) privateStringlogType; ( name="app_name" ) privateStringappName; ( name="trace_id" ) privateStringtraceId; ( name="docker_name" ) privateStringdockerName; ( name="server_ip" ) privateStringserverIp; ( name="method_name" ) privateStringmethodName; privateStringenv; ( serialize=false ) privateValueFilterfilter; ( name="client_ip" ) privateStringclientIp; } publicclassApplicationLogextendsAbstractLog { privateStringlevel; ( name="thread_id" ) privatelongthreadId; privateStringcontext; privateStringexception; ( name="log_message" ) privateStringlogMessage; ( name="stack_message" ) privateStringstackMessage; ( name="class_name" ) privateStringclassName; } publicclassLogMsgFactory { privatestaticDateTimeFormatterdateTimeFormatter=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); /*** 获取普通日志对象** @param logLevel* @param logMessage* @return*/publicstaticApplicationLoggetApplicationLog(LogLevellogLevel, StringlogMessage) { ApplicationLoglog=newApplicationLog(); log.setEnv(FinalEnvConfig.getEnv()); log.setLogVersion("1.0.0"); log.setLogTime(LocalDateTime.now().format(dateTimeFormatter)); log.setTraceId(LogTraceUtils.getTraceId()); LogApplicationContextcontext=newLogApplicationContext(); context.setUrl(RequestCommonUtils.getUrl()); context.setMethod(RequestCommonUtils.getMethod()); context.setParams(JSON.toJSONString(RequestCommonUtils.getParams())); log.setContext(JSON.toJSONString(context)); log.setThreadId(Thread.currentThread().getId()); log.setAppName(FinalEnvConfig.getAppName()); log.setServerIp(RequestCommonUtils.getServerIp()); log.setClientIp(RequestCommonUtils.getClientIp()); log.setMethodName(getMethodName()); log.setLevel(logLevel.toString()); log.setLogMessage(logMessage); returnlog; } /*** 获取方法名** @return*/privatestaticStringgetMethodName() { try { StackTraceElement[] stes=Thread.currentThread().getStackTrace(); for (inti=1; i<stes.length; ++i) { StackTraceElementste=stes[i]; if (!ste.getClassName().equals(LogMsgFactory.class.getName()) &&!ste.getClassName().contains("common.log")) { returnste.getClassName() +"."+ste.getMethodName(); } } returnnull; } catch (Exceptionvar4) { var4.printStackTrace(); returnnull; } } } /*** 日志工具类*/publicclassLogBackUtils { /*** 普通日志*/privatestaticfinalLoggerapplicationLog=LoggerFactory.getLogger("application"); /*** 错误日志** @param msg*/publicstaticvoiderror(Stringmsg) { try { ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.ERROR, msg); log.setClassName(getClassName()); applicationLog.error(log.toJsonString()); } catch (Exceptionvar2) { } } /*** 错误日志** @param msg* @param e*/publicstaticvoiderror(Stringmsg, Throwablee) { try { ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.ERROR, msg); log.setClassName(getClassName()); log.setStackMessage(LogBackUtils.getStackMessage(e)); applicationLog.error(log.toJsonString()); } catch (Exceptionvar2) { } } /*** 错误日志** @param msg* @param obj*/publicstaticvoiderror(Stringmsg, Object... obj) { StringlogMsg=LogBackUtils.defaultFormat(msg, obj); LogBackUtils.error(logMsg); } /*** 警告日志** @param msg*/publicstaticvoidwarn(Stringmsg) { try { ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.WARN, msg); log.setClassName(getClassName()); applicationLog.warn(log.toJsonString()); } catch (Exceptionvar2) { } } /*** 警告日志** @param msg* @param obj*/publicstaticvoidwarn(Stringmsg, Object... obj) { StringlogMsg=LogBackUtils.defaultFormat(msg, obj); LogBackUtils.warn(logMsg); } /*** info日志** @param msg*/publicstaticvoidinfo(Stringmsg) { try { ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.INFO, msg); log.setClassName(getClassName()); applicationLog.info(log.toJsonString()); } catch (Exceptionvar2) { } } /*** info日志** @param msg* @param obj*/publicstaticvoidinfo(Stringmsg, Object... obj) { StringlogMsg=LogBackUtils.defaultFormat(msg, obj); LogBackUtils.info(logMsg); } /*** debug日志** @param msg*/publicstaticvoiddebug(Stringmsg) { try { if (applicationLog.isDebugEnabled()) { ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.DEBUG, msg); log.setClassName(getClassName()); applicationLog.debug(log.toJsonString()); } } catch (Exceptionvar2) { } } /*** debug日志** @param msg* @param obj*/publicstaticvoiddebug(Stringmsg, Object... obj) { StringlogMsg=LogBackUtils.defaultFormat(msg, obj); LogBackUtils.debug(logMsg); } /*** 获取调用 error,info,debug静态类的类名*/privatestaticStringgetClassName() { returnnewSecurityManager() { publicStringgetClassName() { returngetClassContext()[3].getName(); } }.getClassName(); } /*** 格式化日志** @param format* @param argArray* @return*/publicstaticStringdefaultFormat(Stringformat, Object... argArray) { Stringmsg=""; try { if (StringUtils.isEmpty(format)) { StringBuildersb=newStringBuilder(); if (argArray!=null) { Object[] var4=argArray; intvar5=argArray.length; for (intvar6=0; var6<var5; ++var6) { Objectarg=var4[var6]; if (arg!=null) { sb.append(String.format(":%s;", arg.toString())); } } } msg=sb.toString(); } else { msg=String.format(format, argArray); } } catch (Exceptionvar8) { var8.printStackTrace(); } returnmsg; } /*** 获取堆栈信息** @param throwable* @return*/publicstaticStringgetStackMessage(Throwablethrowable) { StringWritersw=newStringWriter(); try (PrintWriterpw=newPrintWriter(sw)) { throwable.printStackTrace(pw); returnsw.toString(); } } }
以上实现单个线程中日志输出会携带trace id 但是异步使用新的线程,或者线程池则不能继续支持trace id 的输出,这样会导致在本业务中,trace id 的链路并不完整,接下来随着我一起改进trace id 的传递。
改进方法
线程间的资源传递,脑子里第一个想到的就是ThreadLocal,通过对jdk中有关ThreadLocal 的探索,定位到了一个InheritableThreadLocal 工具,可以形如父子间共享threadMap 的内容,大概逻辑就是,在当前线程中,启动的新的线程,会将当前线程的threadmap copy 到新的线程中,测试代码如下:
publicclassLearnInheritableThreadLocal { privatestaticThreadLocal<Object>threadLocal=newInheritableThreadLocal<>(); privatestaticExecutorServicees=newThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS,newArrayBlockingQueue<>(100)); publicstaticvoidmain(String[] args) { threadLocal.set("weiyi"); newThread(()->{ System.out.println(Thread.currentThread().getName() +threadLocal.get()); }).start(); newThread(()->{ System.out.println(Thread.currentThread().getName() +threadLocal.get()); }).start(); for (inti=0; i<100; i++){ threadLocal.set("weiyi"+i); es.submit(()->{ System.out.println(Thread.currentThread().getName() +threadLocal.get()); }); } System.out.println(Thread.currentThread().getName() +threadLocal.get()); } }
结果:
从结果来看已经有那味儿了,实现了,满怀期待的上线了,并且跟同事们分享了这个喜悦,但是聪明的同事们顿感不妙,并提出了,当不存在父子线程的关系的时候,这个threadmap 还会进行复制吗?可是线程池的情况我也验证了呀,没问题啊。
带着疑惑,修改了下代码,在线程池打印后增加一行threadLocal.remove();
结果让人震惊:
后续线程池中的线程,无法取到threadLocal 中的内容了,完了,芭比Q了,完了,这功能都上线了,这不乱了套了嘛,好在最近没有问题,且只是日志打印,并不影响业务。
这里涉及到线程池的一个知识点,大家应该都知道,就是线程池在没有接收到任务的时候并不会初始化线程,而是等到真正来任务的时候才会创建线程,测试代码中就存在了这种父子关系,导致能取到内容了。
带着上面的问题,开始进军git hub,从中或许有新的发现。
继续演进
终于在不懈努力下,找到了阿里云开源的TransmittableThreadLocal。
TransmittableThreadLocal的使用
1.修饰Runnable和Callable
Runnabletask=newRunnableTask(); // 额外的处理,生成修饰了的对象ttlRunnable RunnablettlRunnable=TtlRunnable.get(task); executorService.submit(ttlRunnable); Callablecall=newCallableTask(); // 额外的处理,生成修饰了的对象ttlCallable CallablettlCallable=TtlCallable.get(call); executorService.submit(ttlCallable);
2.修饰线程池
省去每次Runnable和Callable传入线程池时的修饰,这个逻辑可以在线程池中完成。
通过工具类TtlExecutors完成,有下面的方法:
- getTtlExecutor:修饰接口Executor
- getTtlExecutorService:修饰接口ExecutorService
- getTtlScheduledExecutorService:修饰接口ScheduledExecutorService
ExecutorServiceexecutorService= ... // 额外的处理,生成修饰了的对象executorService executorService=TtlExecutors.getTtlExecutorService(executorService);
3 使用Java Agent来修饰JDK线程池实现类
这种方式,实现线程池的传递是透明的,业务代码中没有修饰Runnable或是线程池的代码。即可以做到应用代码 无侵入。
在Java的启动参数加上:-javaagent:path/to/transmittable-thread-local-2.x.y.jar。
即可实现无侵入的使用TTL 了,不由的大赞一下阿里的Java 程序员们,nb。
总结
最终我选择了包装线程池的方式进行处理,因为业务中使用了公共的线程池,相对来说好维护,另外在启动命令中增加条件,需要运维伙伴支持,嗯,还是靠自己来的舒服。
另外总结一下,自己测试范围并不是很全面,不够辩证的来看待问题,一旦满足了自己想要的结果,就开始一叶障目,直接上头,以后一定要稳中取胜,一稳再稳。