log异步线程间traceId 的传递

简介: 分布式项目中,存在多服务调用时,通常都会使用trace id 将多个RPC 调用进行串联起来,方便对整个调用链路进行串联,一旦出现问题方便排查是哪个环节的问题。

背景

分布式项目中,存在多服务调用时,通常都会使用trace id 将多个RPC 调用进行串联起来,方便对整个调用链路进行串联,一旦出现问题方便排查是哪个环节的问题。

新进入的公司发现已经存在这类做法儿,服务间调用时通过http 进行通信的,在实际排查问题时,回出现trace id 为空的情况,这导致在不熟悉业务的情况下,很难讲调用链捋顺,想要清楚的了解调用流程需要花费较多的时间研读代码,费时费力。仔细查看后,单个项目里存在异步servlet包装到线程池里执行一些耗时的异步调用的方法。

该项目是将trace id 存放在了header 里,这样就导致异步线程是无法继承存放在header 里的trace id 的。

traceid存储代码如下:

@WebFilter(filterName="logTraceFilter", urlPatterns="/*", asyncSupported=true)
publicclassLogTraceFilterimplementsFilter {
@Overridepublicvoidinit(FilterConfigfilterConfig) throwsServletException {
    }
@OverridepublicvoiddoFilter(ServletRequestservletRequest, ServletResponseservletResponse, FilterChainfilterChain) throwsIOException, ServletException {
HttpServletRequestrequest= (HttpServletRequest) servletRequest;
StringtraceId=RequestCommonUtils.getRequetHeader("trace_id");
if (StringUtils.isBlank(traceId)) {
StringnewTraceId=UUID.randomUUID().toString();
HeaderMapRequestWrapperrequestWrapper=newHeaderMapRequestWrapper(request);
requestWrapper.addHeader("trace_id", newTraceId);
LogTraceUtils.setTraceId(newTraceId);
LogBackUtils.info("logTraceFilter.doFilter 使用新traceId,newTraceId="+newTraceId);
filterChain.doFilter(requestWrapper, servletResponse);
        } else {
LogBackUtils.info("logTraceFilter.doFilter 使用已有traceId,traceId="+traceId);
LogTraceUtils.setTraceId(traceId);
filterChain.doFilter(request, servletResponse);
        }
    }
@Overridepublicvoiddestroy() {
    }
}

日志打印代码如下:

/*** 修改请求头工具*/publicclassHeaderMapRequestWrapperextendsHttpServletRequestWrapper {
privateMap<String, String>headerMap=newHashMap<>();
/*** construct a wrapper for this request** @param request*/publicHeaderMapRequestWrapper(HttpServletRequestrequest) {
super(request);
    }
/*** add a header with given name and value** @param name* @param value*/publicvoidaddHeader(Stringname, Stringvalue) {
headerMap.put(name, value);
    }
@OverridepublicStringgetHeader(Stringname) {
StringheaderValue=super.getHeader(name);
if (headerMap.containsKey(name)) {
headerValue=headerMap.get(name);
        }
returnheaderValue;
    }
/*** get the Header names*/@OverridepublicEnumeration<String>getHeaderNames() {
List<String>names=Collections.list(super.getHeaderNames());
for (Stringname : headerMap.keySet()) {
names.add(name);
        }
returnCollections.enumeration(names);
    }
@OverridepublicEnumeration<String>getHeaders(Stringname) {
List<String>values=Collections.list(super.getHeaders(name));
if (headerMap.containsKey(name)) {
values=Arrays.asList(headerMap.get(name));
        }
returnCollections.enumeration(values);
    }
}
@DatapublicabstractclassAbstractLogimplementsCloneable {
@JSONField(
name="log_version"    )
privateStringlogVersion;
@JSONField(
name="log_time"    )
privateStringlogTime;
@JSONField(
name="log_type"    )
privateStringlogType;
@JSONField(
name="app_name"    )
privateStringappName;
@JSONField(
name="trace_id"    )
privateStringtraceId;
@JSONField(
name="docker_name"    )
privateStringdockerName;
@JSONField(
name="server_ip"    )
privateStringserverIp;
@JSONField(
name="method_name"    )
privateStringmethodName;
privateStringenv;
@JSONField(
serialize=false    )
privateValueFilterfilter;
@JSONField(
name="client_ip"    )
privateStringclientIp;
 }
@DatapublicclassApplicationLogextendsAbstractLog {
privateStringlevel;
@JSONField(
name="thread_id"    )
privatelongthreadId;
privateStringcontext;
privateStringexception;
@JSONField(
name="log_message"    )
privateStringlogMessage;
@JSONField(
name="stack_message"    )
privateStringstackMessage;
@JSONField(
name="class_name"    )
privateStringclassName;
 }
publicclassLogMsgFactory {
privatestaticDateTimeFormatterdateTimeFormatter=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/*** 获取普通日志对象** @param logLevel* @param logMessage* @return*/publicstaticApplicationLoggetApplicationLog(LogLevellogLevel, StringlogMessage) {
ApplicationLoglog=newApplicationLog();
log.setEnv(FinalEnvConfig.getEnv());
log.setLogVersion("1.0.0");
log.setLogTime(LocalDateTime.now().format(dateTimeFormatter));
log.setTraceId(LogTraceUtils.getTraceId());
LogApplicationContextcontext=newLogApplicationContext();
context.setUrl(RequestCommonUtils.getUrl());
context.setMethod(RequestCommonUtils.getMethod());
context.setParams(JSON.toJSONString(RequestCommonUtils.getParams()));
log.setContext(JSON.toJSONString(context));
log.setThreadId(Thread.currentThread().getId());
log.setAppName(FinalEnvConfig.getAppName());
log.setServerIp(RequestCommonUtils.getServerIp());
log.setClientIp(RequestCommonUtils.getClientIp());
log.setMethodName(getMethodName());
log.setLevel(logLevel.toString());
log.setLogMessage(logMessage);
returnlog;
    }
/*** 获取方法名** @return*/privatestaticStringgetMethodName() {
try {
StackTraceElement[] stes=Thread.currentThread().getStackTrace();
for (inti=1; i<stes.length; ++i) {
StackTraceElementste=stes[i];
if (!ste.getClassName().equals(LogMsgFactory.class.getName()) &&!ste.getClassName().contains("common.log")) {
returnste.getClassName() +"."+ste.getMethodName();
                }
            }
returnnull;
        } catch (Exceptionvar4) {
var4.printStackTrace();
returnnull;
        }
    }
}
/*** 日志工具类*/publicclassLogBackUtils {
/*** 普通日志*/privatestaticfinalLoggerapplicationLog=LoggerFactory.getLogger("application");
/*** 错误日志** @param msg*/publicstaticvoiderror(Stringmsg) {
try {
ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.ERROR, msg);
log.setClassName(getClassName());
applicationLog.error(log.toJsonString());
        } catch (Exceptionvar2) {
        }
    }
/*** 错误日志** @param msg* @param e*/publicstaticvoiderror(Stringmsg, Throwablee) {
try {
ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.ERROR, msg);
log.setClassName(getClassName());
log.setStackMessage(LogBackUtils.getStackMessage(e));
applicationLog.error(log.toJsonString());
        } catch (Exceptionvar2) {
        }
    }
/*** 错误日志** @param msg* @param obj*/publicstaticvoiderror(Stringmsg, Object... obj) {
StringlogMsg=LogBackUtils.defaultFormat(msg, obj);
LogBackUtils.error(logMsg);
    }
/*** 警告日志** @param msg*/publicstaticvoidwarn(Stringmsg) {
try {
ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.WARN, msg);
log.setClassName(getClassName());
applicationLog.warn(log.toJsonString());
        } catch (Exceptionvar2) {
        }
    }
/*** 警告日志** @param msg* @param obj*/publicstaticvoidwarn(Stringmsg, Object... obj) {
StringlogMsg=LogBackUtils.defaultFormat(msg, obj);
LogBackUtils.warn(logMsg);
    }
/*** info日志** @param msg*/publicstaticvoidinfo(Stringmsg) {
try {
ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.INFO, msg);
log.setClassName(getClassName());
applicationLog.info(log.toJsonString());
        } catch (Exceptionvar2) {
        }
    }
/*** info日志** @param msg* @param obj*/publicstaticvoidinfo(Stringmsg, Object... obj) {
StringlogMsg=LogBackUtils.defaultFormat(msg, obj);
LogBackUtils.info(logMsg);
    }
/*** debug日志** @param msg*/publicstaticvoiddebug(Stringmsg) {
try {
if (applicationLog.isDebugEnabled()) {
ApplicationLoglog=LogMsgFactory.getApplicationLog(LogLevel.DEBUG, msg);
log.setClassName(getClassName());
applicationLog.debug(log.toJsonString());
            }
        } catch (Exceptionvar2) {
        }
    }
/*** debug日志** @param msg* @param obj*/publicstaticvoiddebug(Stringmsg, Object... obj) {
StringlogMsg=LogBackUtils.defaultFormat(msg, obj);
LogBackUtils.debug(logMsg);
    }
/*** 获取调用 error,info,debug静态类的类名*/privatestaticStringgetClassName() {
returnnewSecurityManager() {
publicStringgetClassName() {
returngetClassContext()[3].getName();
            }
        }.getClassName();
    }
/*** 格式化日志** @param format* @param argArray* @return*/publicstaticStringdefaultFormat(Stringformat, Object... argArray) {
Stringmsg="";
try {
if (StringUtils.isEmpty(format)) {
StringBuildersb=newStringBuilder();
if (argArray!=null) {
Object[] var4=argArray;
intvar5=argArray.length;
for (intvar6=0; var6<var5; ++var6) {
Objectarg=var4[var6];
if (arg!=null) {
sb.append(String.format(":%s;", arg.toString()));
                        }
                    }
                }
msg=sb.toString();
            } else {
msg=String.format(format, argArray);
            }
        } catch (Exceptionvar8) {
var8.printStackTrace();
        }
returnmsg;
    }
/*** 获取堆栈信息** @param throwable* @return*/publicstaticStringgetStackMessage(Throwablethrowable) {
StringWritersw=newStringWriter();
try (PrintWriterpw=newPrintWriter(sw)) {
throwable.printStackTrace(pw);
returnsw.toString();
        }
    }
}

以上实现单个线程中日志输出会携带trace id 但是异步使用新的线程,或者线程池则不能继续支持trace id 的输出,这样会导致在本业务中,trace id 的链路并不完整,接下来随着我一起改进trace id 的传递。

改进方法

线程间的资源传递,脑子里第一个想到的就是ThreadLocal,通过对jdk中有关ThreadLocal 的探索,定位到了一个InheritableThreadLocal  工具,可以形如父子间共享threadMap 的内容,大概逻辑就是,在当前线程中,启动的新的线程,会将当前线程的threadmap copy 到新的线程中,测试代码如下:

publicclassLearnInheritableThreadLocal {
privatestaticThreadLocal<Object>threadLocal=newInheritableThreadLocal<>();
privatestaticExecutorServicees=newThreadPoolExecutor(5,
10,10, TimeUnit.SECONDS,newArrayBlockingQueue<>(100));
publicstaticvoidmain(String[] args) {
threadLocal.set("weiyi");
newThread(()->{
System.out.println(Thread.currentThread().getName() +threadLocal.get());
        }).start();
newThread(()->{
System.out.println(Thread.currentThread().getName() +threadLocal.get());
        }).start();
for (inti=0; i<100;  i++){
threadLocal.set("weiyi"+i);
es.submit(()->{
System.out.println(Thread.currentThread().getName() +threadLocal.get());
            });
        }
System.out.println(Thread.currentThread().getName() +threadLocal.get());
    }
}

结果:

image.png

从结果来看已经有那味儿了,实现了,满怀期待的上线了,并且跟同事们分享了这个喜悦,但是聪明的同事们顿感不妙,并提出了,当不存在父子线程的关系的时候,这个threadmap 还会进行复制吗?可是线程池的情况我也验证了呀,没问题啊。

带着疑惑,修改了下代码,在线程池打印后增加一行threadLocal.remove();

结果让人震惊:

image.png

后续线程池中的线程,无法取到threadLocal 中的内容了,完了,芭比Q了,完了,这功能都上线了,这不乱了套了嘛,好在最近没有问题,且只是日志打印,并不影响业务。

这里涉及到线程池的一个知识点,大家应该都知道,就是线程池在没有接收到任务的时候并不会初始化线程,而是等到真正来任务的时候才会创建线程,测试代码中就存在了这种父子关系,导致能取到内容了。

带着上面的问题,开始进军git hub,从中或许有新的发现。

继续演进

终于在不懈努力下,找到了阿里云开源的TransmittableThreadLocal。

TransmittableThreadLocal的使用

1.修饰Runnable和Callable

Runnabletask=newRunnableTask(); // 额外的处理,生成修饰了的对象ttlRunnable RunnablettlRunnable=TtlRunnable.get(task); 
executorService.submit(ttlRunnable); 
Callablecall=newCallableTask(); // 额外的处理,生成修饰了的对象ttlCallable CallablettlCallable=TtlCallable.get(call); 
executorService.submit(ttlCallable);

2.修饰线程池

省去每次RunnableCallable传入线程池时的修饰,这个逻辑可以在线程池中完成。

通过工具类TtlExecutors完成,有下面的方法:

  • getTtlExecutor:修饰接口Executor
  • getTtlExecutorService:修饰接口ExecutorService
  • getTtlScheduledExecutorService:修饰接口ScheduledExecutorService
ExecutorServiceexecutorService= ... // 额外的处理,生成修饰了的对象executorService executorService=TtlExecutors.getTtlExecutorService(executorService);

3 使用Java Agent来修饰JDK线程池实现类

这种方式,实现线程池的传递是透明的,业务代码中没有修饰Runnable或是线程池的代码。即可以做到应用代码 无侵入

Java的启动参数加上:-javaagent:path/to/transmittable-thread-local-2.x.y.jar

即可实现无侵入的使用TTL 了,不由的大赞一下阿里的Java 程序员们,nb。

总结

最终我选择了包装线程池的方式进行处理,因为业务中使用了公共的线程池,相对来说好维护,另外在启动命令中增加条件,需要运维伙伴支持,嗯,还是靠自己来的舒服。

另外总结一下,自己测试范围并不是很全面,不够辩证的来看待问题,一旦满足了自己想要的结果,就开始一叶障目,直接上头,以后一定要稳中取胜,一稳再稳。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
4月前
|
Python
【Python30天速成计划】10.异步以及多进程和多线程
【Python30天速成计划】10.异步以及多进程和多线程
|
8月前
|
数据采集 Java Python
多线程与多任务异步协程高效爬虫
多线程与多任务异步协程高效爬虫
|
8月前
|
数据采集 Python
使用多线程或异步技术提高图片抓取效率
图片抓取是爬虫技术中常见的需求,但是图片抓取的效率受到很多因素的影响,比如网速、网站反爬机制、图片数量和大小等。本文将介绍如何使用多线程或异步技术来提高图片抓取的效率,以及如何使用爬虫代理IP来避免被网站拒绝服务
使用多线程或异步技术提高图片抓取效率
|
2月前
|
Python
Python学习之路 02 之分支结构
Python学习之路 02 之分支结构
457 0
Python学习之路 02 之分支结构
|
2月前
|
Java Python 开发者
Python 学习之路 01基础入门---【Python安装,Python程序基本组成】
线程池详解与异步任务编排使用案例-xian-cheng-chi-xiang-jie-yu-yi-bu-ren-wu-bian-pai-shi-yong-an-li
471 2
Python 学习之路 01基础入门---【Python安装,Python程序基本组成】
|
1月前
|
JavaScript 前端开发
JS 单线程还是多线程,如何显示异步操作
JS 单线程还是多线程,如何显示异步操作
22 2
|
2月前
|
JavaScript Java API
spring boot使用异步多线程
一文讲清楚spring boot如何结合异步多线程实现文件的导出这类耗时间的操作优化以及常用的场景,了解异步思想
39 0
spring boot使用异步多线程
|
2月前
|
Java
多线程------Future异步任务
多线程------Future异步任务
|
4月前
|
iOS开发
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
38 1
|
9月前
|
安全 Java Android开发
Android 中AsyncTask后台线程,异步任务的理解
Android 中AsyncTask后台线程,异步任务的理解
102 0