【通用行业开发部】阿里开源TransmittableThreadLocal使用经验记录

简介: 本文章主要记录我在一次业务优化中,使用线程池带来的子父线程值传递问题,及使用TransmittableThreadLocal来解决该问题的经验,并对TransmittableThreadLocal原理做些梳理。

业务背景

  前段时间在工作项目中,被指派一项优化任务,由于原功能耗时较久,影响使用体感;梳理完该功能涉及到的业务后,发现具备复杂、量大等特点。

    将一个大体量数据循环分割成若干小循环,并行执行,结果汇总,是该优化方案中的一环。但原业务中使用ThreadLocal。

    ThreadLocal是解决线程间数据隔离性的,源于它将值存储在每个线程Thread的自己变量(ThreadLocal.ThreadLocalMap)中,但是ThreadLocal有个弊端,由于线程间隔离性,子线程无法捕获父线程ThreadLocalMap里的值。一旦使用多线程,需要解决值传递问题。


尝试使用InheritableThreadLocal(Itl)解决问题

   Thread线程有两个ThreadLocal.ThreadLocalMap 类型变量threadLocals 和 inheritableThreadLocals。

ThreadLocal.ThreadLocalMapthreadLocals=null;
ThreadLocal.ThreadLocalMapinheritableThreadLocals=null;

inheritableThreadLocals 是在Thread 初始化方法init中赋值。

if (inheritThreadLocals&&parent.inheritableThreadLocals!=null)
this.inheritableThreadLocals=ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); // 创建一个新ThreadLocalMap 并将父线程值传递过去

    InheritableThreadLocal是可以解决子父线程值传递问题。我使用Itl 代替了Tl。待优化工作完毕,本地调试及测试通过后。发布到测试环境验证中,在测试环境中发现功能出现了异常,重新回归到本地测试,本地功能多次反复点测却又是完好的。我一度认为环境问题导致,却毫无根据。开始输出日志定位排查,发现测试环境部分线程没有获取到主线程set的值。

    开始思考其中原因。想到了线程池问题,项目中使用了线程池,点测该功能前,线程池是有部分线程存于活跃状态,该部分线程没有经过创建时值传递过程,在执行该功能时,获取不到父线程set的值。

     本地环境却是好的,这也不奇怪了。在我本地程序重启后,直接点测了该功能,线程池中的线程都是在该任务中创建的,全部携带了父线程set的值。

使用TransmittableThreadLocal解决线程池引起的问题

    查阅了资源,决定使用Alibaba开源的TransmittableThreadLocal,根据官网描述:TransmittableThreadLocal(简称TTL),"在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题"。在使用TransmittableThreadLocal后,测试环境线程池引发的问题确实得到了解决。

官网地址:https://github.com/alibaba/transmittable-thread-local

知其然知其所以然

Ttl定义:

TransmittableThreadLocal<T>extendsInheritableThreadLocal<T>//集成了Itl,具备Itl的功能.子线程被创建后,继承了父线程的值

Ttl有个类变量hold:

/*** hold是一个InheritableThreadLocal类型变量,持有WeakHashMap,重写了initialValue()和childValue()方法,* 异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象*/privatestaticfinalInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>holder=newInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@OverrideprotectedWeakHashMap<TransmittableThreadLocal<Object>, ?>initialValue() {
returnnewWeakHashMap<TransmittableThreadLocal<Object>, Object>();
                }
@OverrideprotectedWeakHashMap<TransmittableThreadLocal<Object>, ?>childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?>parentValue) {
returnnewWeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
                }
            };

该hold变量数据来源:

publicfinalTget() {
Tvalue=super.get();
if (disableIgnoreNullValueSemantics||null!=value) addThisToHolder();  //往hold变量添加值returnvalue;
    }
@Overridepublicfinalvoidset(Tvalue) {
if (!disableIgnoreNullValueSemantics&&null==value) {
// 如果值为空 就移除掉remove();
        } else {
super.set(value);
addThisToHolder();//往hold变量添加值        }
    }
@SuppressWarnings("unchecked")
privatevoidaddThisToHolder() {
if (!holder.get().containsKey(this)) {
//开始往hold 变量中存值holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap 支撑null值//这里打印一句话 方面梳理整个工作流程System.out.println(String.format("%s线程已成功在hold变量中存储值。key:%s,value=%s",Thread.currentThread().getName(),this.hashCode(),this.get()));
        }
    }

以上代码做个总结Ttl有一个InheritableThreadLocal类型hold变量,通过线程的set,get操作触发addThisToHolder()方法进行添加值。

另外需要注意的是:Ttl不是直接执行Runnable的run方法,而是通过TtlRunnable对run方法做了一层封装。

先看下Runnable 构造方法:

privateTtlRunnable(@NonNullRunnablerunnable, booleanreleaseTtlValueReferenceAfterRun) {
this.capturedRef=newAtomicReference<Object>(capture()); //会触发 capture()方法this.runnable=runnable;
this.releaseTtlValueReferenceAfterRun=releaseTtlValueReferenceAfterRun;
    }

TtlRunnable构造方法中,会触发capture(),将数据(主线程set的值)储存在capturedRef变量中。

接着看下capture() 方法:

publicstaticObjectcapture() {
//从父线程捕获的值形成快照对象returnnewSnapshot(captureTtlValues(), captureThreadLocalValues());
        }
privatestaticHashMap<TransmittableThreadLocal<Object>, Object>captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object>ttl2Value=newHashMap<TransmittableThreadLocal<Object>, Object>();
//这里打印一下语句 方便掌握整个流程System.out.println(String.format("----------------------------------------------------------------------------"));
System.out.println(String.format("capture()开始从hold变量捕获数据"));
inti=0;
/*** 这里遍历hold,取出父线程set进去的值,存储到HashMap类型的ttl2Value变量中*/for (TransmittableThreadLocal<Object>threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
i++;
System.out.println(String.format("从hold变量中拿到的数据%s:key=%s, value=%S",i,threadLocal.hashCode(),threadLocal.get()));
            }
returnttl2Value;
        }

以上代码做个总结:TtlRunnable在构建时,触发capture()方法,将父线程存储到hold变量中的值形成快照(Snapshot)存档下来。

接着看TtlRunnable的run方法:

publicvoidrun() {
finalObjectcaptured=capturedRef.get();  //capture()方法返回的值if (captured==null||releaseTtlValueReferenceAfterRun&&!capturedRef.compareAndSet(captured, null)) {
thrownewIllegalStateException("TTL value reference is released after run!");
        }
/*** replay方法比较关键, 做了几件事* 1、遍历hold变量,然后将数据备份到map类型的backup变量中* 2、判断hold变量中是否存在captured变量中不存在的元素,如有,将从hold移除* 3、为当前子线程赋值* backup用于恢复数据,任务执行完毕,需要回归线程池,需要恢复其原本变量值*/finalObjectbackup=replay(captured);
try {
runnable.run(); //此时子线程已有值,可以执行业务        } finally {
/*** 1、根据backup备份数据,将hold数据复原* 2、根据backup备份数据,将子线程重新恢复原有数值*/restore(backup);
        }
    }

TtlRunnable的run方法中, replay()和restore()方法比较关键。

先看 replay()方法:

@NonNullpublicstaticObjectreplay(@NonNullObjectcaptured) {
finalSnapshotcapturedSnapshot= (Snapshot) captured;
returnnewSnapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
        }
@NonNullprivatestaticHashMap<TransmittableThreadLocal<Object>, Object>replayTtlValues(@NonNullHashMap<TransmittableThreadLocal<Object>, Object>captured) {
HashMap<TransmittableThreadLocal<Object>, Object>backup=newHashMap<TransmittableThreadLocal<Object>, Object>();
//以下打印语句 方面梳理整个工作流程System.out.println("--------------------------------------------------------------");
System.out.println(String.format("replayTtlValues开始,%s线程从holder变量里的获取数据列表",Thread.currentThread().getName()));
//此循环是我手动写的 源码没有inti=0;
for (Iterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator(); iterator.hasNext();){
TransmittableThreadLocal<Object>threadLocal=iterator.next();
i++;
System.out.println(String.format("%s线程从hold拿到的数据%s:key=%s, value=%s",Thread.currentThread().getName(), i,threadLocal.hashCode(), threadLocal.get()));
            }
for (finalIterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object>threadLocal=iterator.next();
backup.put(threadLocal, threadLocal.get());
/***从hold变量清理点captured变量中不存在的值,避免在执行业务(run方法)时存在数据污染** 注意:captured存储的变量和此时hold存储的变量有可能不一致* 原因:captured 变量存储的是父线程放进去的值。在TtlRunnable构建方法中触发capture() 存入到captured的值  * 而此时从hold.get()获取的值,是子线程被创建时从父线获取的。触发了Thread init初始化方法。** 为什么要执行该清理操作?* 这里需要举例子说明: 如果子线程在被创建是已经从父现在那里获取到了值 value=3,该子线程在线程池中如果没有消灭,在某个主线程中再次* 执行任务时,假如该主线程 没有设置value=3这个值,此时captured变量中就不会有value=3 那么该子线程执行run方法时 就会取到vlue=3这个值,污染了业务数据**/if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
                }
            }
/*** 这里将captured变量中的值 set到本线程中,进而触发了addThisToHolder()方法,* 该方法执行后,hold变量有可能会发生变化*/setTtlValuesTo(captured);
// 用于功能拓展doExecuteCallback(true);
//以下打印语句 方面梳理整个工作流程System.out.println("--------------------------------------------------------------");
System.out.println(String.format("replayTtlValues结束,%s线程从holder变量里的获取数据列表",Thread.currentThread().getName()));
i=0;
for (Iterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator(); iterator.hasNext();){
TransmittableThreadLocal<Object>threadLocal=iterator.next();
i++;
System.out.println(String.format("%s线程从hold拿到的数据%s:key=%s, value=%s",Thread.currentThread().getName(), i,threadLocal.hashCode(), threadLocal.get()));
            }
returnbackup;
        }

接着看restore()方法:

publicstaticvoidrestore(@NonNullObjectbackup) {
finalSnapshotbackupSnapshot= (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
        }
privatestaticvoidrestoreTtlValues(@NonNullHashMap<TransmittableThreadLocal<Object>, Object>backup) {
// call afterExecute callbackdoExecuteCallback(false);
//以下打印语句 方面梳理整个工作流程 在源代码中不存在System.out.println("--------------------------------------------------------------");
System.out.println("restoreTtlValues开始,从holder变量里的获取数据列表");
inti=0;
for (Iterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator(); iterator.hasNext();){
TransmittableThreadLocal<Object>threadLocal=iterator.next();
i++;
System.out.println(String.format("从hold拿到的数据%s:key=%s, value=%s", i,threadLocal.hashCode(), threadLocal.get()));
            }
/*** 对holder再次遍历,然后做数据清洗工作* backup 也就是replay方法中对子线程原本持有的数据备份,而此时的hold变量,经过replay方法后,* 已经包括了某些主线程set进去的值* 所有需要借助backup 清理hold变量,使之恢复到子线程最之初,也就是被创建时携带的数据*/for (finalIterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object>threadLocal=iterator.next();
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
                }
            }
//以下打印语句 方面梳理整个工作流程 在源代码中不存在System.out.println("--------------------------------------------------------------");
System.out.println("restoreTtlValues结束,从holder变量里的获取数据列表");
i=0;
for (Iterator<TransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator(); iterator.hasNext();){
TransmittableThreadLocal<Object>threadLocal=iterator.next();
i++;
System.out.println(String.format("从hold拿到的数据%s:key=%s, value=%s", i,threadLocal.hashCode(), threadLocal.get()));
            }
/*** 1、根据backup备份数据,将hold数据复原* 2、根据backup备份数据,将子线程重新恢复原有数值*/setTtlValuesTo(backup);
        }

以上代码做个总结:在TtlRunnable的run方法中,先拿到captured快照数据,调用replay()方法,通过对hold变量遍历,对子线程原本数据对备份(backup),然后对比快照数据,清洗到hold变量中快照数据不存在的值。执行Runnable中的run方法,最后调用restore()方法,进行对hold变量和子线程持有数据进行恢复。

测试工作流程

核心代码已经结束了。为了更直观查看流程,从官网clone下来源代码,进行测试。

测试方法:

publicclassTtlControllerTest {
privatestaticExecutorServiceexecutorService=Executors.newFixedThreadPool(1);
publicstaticvoidmain(String[] args) throwsInterruptedException {
finalThreadLocaltl1=newTransmittableThreadLocal();
finalThreadLocaltl2=newTransmittableThreadLocal();
tl1.set(1);
executorService.execute(TtlRunnable.get(newRunnable() {
@Overridepublicvoidrun() {
try {
Thread.sleep(100L);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                }
            }
        }));
newThread()
tl2.set(3);
Thread.sleep(1000L); //这块停顿一下executorService.execute(TtlRunnable.get(newRunnable() {
@Overridepublicvoidrun() {
try {
Thread.sleep(100L);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                }
            }
        }));
    }
}

查看流程结果

main线程已成功在hold变量中存储值。key:1282788025,value=1----------------------------------------------------------------------------capture()开始从hold变量捕获数据从hold变量中拿到的数据1:key=1282788025, value=1main线程已成功在hold变量中存储值。key:2101440631,value=3--------------------------------------------------------------replayTtlValues开始,pool-1-thread-1线程从holder变量里的获取数据列表pool-1-thread-1线程从hold拿到的数据1:key=1282788025, value=1--------------------------------------------------------------replayTtlValues结束,pool-1-thread-1线程从holder变量里的获取数据列表pool-1-thread-1线程从hold拿到的数据1:key=1282788025, value=1--------------------------------------------------------------restoreTtlValues开始,从holder变量里的获取数据列表从hold拿到的数据1:key=1282788025, value=1--------------------------------------------------------------restoreTtlValues结束,从holder变量里的获取数据列表从hold拿到的数据1:key=1282788025, value=1----------------------------------------------------------------------------capture()开始从hold变量捕获数据从hold变量中拿到的数据1:key=2101440631, value=3从hold变量中拿到的数据2:key=1282788025, value=1--------------------------------------------------------------replayTtlValues开始,pool-1-thread-1线程从holder变量里的获取数据列表pool-1-thread-1线程从hold拿到的数据1:key=1282788025, value=1pool-1-thread-1线程已成功在hold变量中存储值。key:2101440631,value=3--------------------------------------------------------------replayTtlValues结束,pool-1-thread-1线程从holder变量里的获取数据列表pool-1-thread-1线程从hold拿到的数据1:key=2101440631, value=3pool-1-thread-1线程从hold拿到的数据2:key=1282788025, value=1--------------------------------------------------------------restoreTtlValues开始,从holder变量里的获取数据列表从hold拿到的数据1:key=2101440631, value=3从hold拿到的数据2:key=1282788025, value=1--------------------------------------------------------------restoreTtlValues结束,从holder变量里的获取数据列表从hold拿到的数据1:key=1282788025, value=1
相关文章
|
云计算
阿里云蔡英华:伙伴优先是阿里云的核心态度,没有之一
2022年7月13日,在2022阿里云合作伙伴大会上,阿里云智能全球销售总裁蔡英华发表了“坚持伙伴优先”主题演讲。他表示:“ 阿里云要恪守边界,与伙伴形成分工明确的合作模式、货真价实的权益体系和长期稳定的发展政策,并对伙伴收入占比提出明确数字要求。”
|
10月前
|
搜索推荐 开发者
开源打败商业 —— 敲敲云 apaas 平台成为了零代码领域的黑马,开启零代码新时代
随着数字化转型的浪潮席卷全球,企业对于高效、灵活的业务系统需求日益增长。在这个背景下,零代码开发平台应运而生,成为了市场上备受瞩目的产品。而在众多零代码产品中,敲敲云 以其开源的身份和高效的研发速度,彰显了其与众不同的竞争力。
670 0
|
机器学习/深度学习 人工智能 供应链
OpenAI再建顶级团队,重金招聘核心岗成员,阻止超级AI的威胁!
OpenAI再建顶级团队,重金招聘核心岗成员,阻止超级AI的威胁!
149 0
《Elastic(中国)产品应用实战》——二、首席信息官如何帮助增强业务弹性:以员工体验为 核心
《Elastic(中国)产品应用实战》——二、首席信息官如何帮助增强业务弹性:以员工体验为 核心
|
存储 人工智能 达摩院
再获殊荣!达摩院数据库与存储实验室荣登「InfoQ年度十大高价值技术团队」榜单
日前,由全球知名中文技术媒体InfoQ 发起,以“深入数字经济·洞见技术价值”为主题的“InfoQ 2022 中国技术力量年终榜单”结果正式揭晓。凭借突破性的技术创新、优秀的技术实践和落地案例,达摩院数据库与存储实验室荣获【2022 年度十大高价值技术团队】称号。
再获殊荣!达摩院数据库与存储实验室荣登「InfoQ年度十大高价值技术团队」榜单
|
人工智能 运维 Kubernetes
获5项大奖,发布《云计算开放应用架构标准》,阿里云持续领航云原生
阿里云致力于为企业打造数字创新的最短路径,从实战中来,为价值而生。
2720 21
获5项大奖,发布《云计算开放应用架构标准》,阿里云持续领航云原生
|
开发者
“软件定义世界,开源共筑未来”|2022 开放原子全球开源峰会报名火热开启!
“软件定义世界,开源共筑未来”|2022 开放原子全球开源峰会报名火热开启!
191 0
“软件定义世界,开源共筑未来”|2022 开放原子全球开源峰会报名火热开启!
|
开发框架 人工智能 Cloud Native
阿里云开发者两活动获评2021中国最受开发者欢迎的技术活动
阿里云开发者两活动获评2021中国最受开发者欢迎的技术活动
阿里云开发者两活动获评2021中国最受开发者欢迎的技术活动
|
人工智能 供应链 Cloud Native
极狐 (GitLab) 公司获数亿元 A 轮融资,将为研发团队扩充、开源生态建设补充“弹药”
4 月 11 日,极狐(GitLab)(以下简称“极狐公司”)正式宣布 A 轮融资签约完成,融资金额达数亿元人民币。
176 0
极狐 (GitLab) 公司获数亿元 A 轮融资,将为研发团队扩充、开源生态建设补充“弹药”
|
架构师 前端开发 Cloud Native
国内首个开源架构治理平台 ArchGuard,专治分布式场景下各种不服
过去的 10 年间,软件的架构发生了巨大的变化,从早先流行的单体 MVC 架构,变成了所谓的 5:5 开,即分布式 vs 单体。只是呢,有大量的软件开发人员,无法看到系统的全貌,又或者是从单体的思维转变过来。于是,哪怕是在使用了微服务的情况下,但是实现的却又是一个一个的单体,只是它们变成了“分布式的单体”。
637 0
国内首个开源架构治理平台 ArchGuard,专治分布式场景下各种不服

热门文章

最新文章