从ThreadLocal谈到TransmittableThreadLocal,从使用到原理2:https://developer.aliyun.com/article/1394837
TransmittableThreadLocal
TransmittableThreadLocal
是alibaba 开源的一个工具类,github地址,是用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展工具类。
回到上面的问题,我们使用 TransmittableThreadLocal
来改造一下上面的问题吧
添加相关依赖:
<!--https://github.com/alibaba/transmittable-thread-local/releases/tag/v2.14.2--> <dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.14.2</version> </dependency>
使用方式
使用方式一:使用transmittable-thread-local
jar中的相关包装类,或者相关api对使用的相关类进行包装(此处包装
两字只是我个人的形容,严格说起来是用了设计模式中的装饰器模式,还有也用到了模板方法模式)
装饰器模式:指在不改变现有对象结构的情况下,动态地给该对象增加一些职责(即增加其额外功能)的模式,它属于对象结构型模式。
public class TransmittableThreadLocalDemo1 { /** * 业务线程池,service 中执行异步任务的线程池 * 使用 TtlExecutors.getTtlExecutorService() 包装一下我们自己的线程池,这样才可以 使用 TransmittableThreadLocal 解决在使用线程池等会缓存线程的组件情况下传递ThreadLocal的问题 */ private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5)); /** * 线程上下文环境,改为使用 TransmittableThreadLocal 来保存 * 然后在这里提交一个异步任务,模拟在子线程(执行异步任务的线程)中,是否可以访问到刚设置的环境变量值。 */ private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>(); public static void main(String[] args) { // 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称, for (int i = 0; i < 10; i++) { // 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量 new Thread(new ServiceThread(i)).start(); } try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //关闭线程池 businessExecutors.shutdown(); } /** * 模拟Service业务代码 */ static class ServiceThread implements Runnable { private int i; public ServiceThread(int i) { this.i = i; } @Override public void run() { requestIdThreadLocal.set(i); System.out.println("执行service方法==>在"+Thread.currentThread().getName() + "中存储变量副本==>" + i); // 异步编程 CompletableFuture.runAsync()创建无返回值的简单异步任务,businessExecutors 表示线程池~ CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { try { // 模拟执行时间 Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:"+requestIdThreadLocal.get()); }, businessExecutors); requestIdThreadLocal.remove(); } } }
输出结果:
执行service方法==>在Thread-2中存储变量副本==>1 执行service方法==>在Thread-1中存储变量副本==>0 执行service方法==>在Thread-5中存储变量副本==>4 执行service方法==>在Thread-3中存储变量副本==>2 执行service方法==>在Thread-4中存储变量副本==>3 执行service方法==>在Thread-6中存储变量副本==>5 执行service方法==>在Thread-8中存储变量副本==>7 执行service方法==>在Thread-9中存储变量副本==>8 执行service方法==>在Thread-7中存储变量副本==>6 执行service方法==>在Thread-10中存储变量副本==>9 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:0 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:8 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:3 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:9 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:7 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:6 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:1 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:5 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:4 执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:2
都是唯一的,不再有重复的值
使用方式二:添加JVM的启动参数
idea 启动
-javaagent:path/transmittable-thread-local-xxxx.jar
使用-javaagent
近似无侵入式的使用TransmittableThreadLocal
.
public class TransmittableThreadLocalDemo2 { /** * 业务线程池,service 中执行异步任务的线程池 */ private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5); /** * 线程上下文环境,在service中设置环境变量, * 然后在这里提交一个异步任务,模拟在子线程(执行异步任务的线程)中,是否可以访问到刚设置的环境变量值。 */ private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>(); public static void main(String[] args) { // 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称, for (int i = 0; i < 10; i++) { // 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量 new Thread(new ServiceThread(i)).start(); } try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //关闭线程池 businessExecutors.shutdown(); } /** * 模拟Service业务代码 */ static class ServiceThread implements Runnable { private int i; public ServiceThread(int i) { this.i = i; } @Override public void run() { requestIdThreadLocal.set(i); System.out.println("执行service方法==>在"+Thread.currentThread().getName() + "中存储变量副本==>" + i); // 异步编程 CompletableFuture.runAsync()创建无返回值的简单异步任务,businessExecutors 表示线程池~ CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { try { // 模拟执行时间 Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:"+requestIdThreadLocal.get()); }, businessExecutors); requestIdThreadLocal.remove(); } } }
你可以试着添加和不添加都试一下,就可以看出效果来啦。
jar包方式启动的话,也是一样,在启动参数上加上-javaagent:path/transmittable-thread-local-xxxx.jar
即可。
注意:-javaagent:path/transmittable-thread-local-xxxx.jar
要放在第一个启动参数中。
相关issue:TTL agent 与 其他agent的兼容性问题
javaagent不太熟悉,不过后面参考文档中有贴出来我好奇时看了的相关优质文章,大家感兴趣的话也可以看看,值得了解的。
浅浅的分析了点内容
在第一个案例中,使用 TransmittableThreadLocal 时,我们也使用了 TtlExecutors.getTtlExecutorService()
对我们的线程池做了增强(这也是必须的搭配,否则没法使用 TransmittableThreadLocal 特性)
private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
那到底做了一些什么事情呢?然后就又可以让TransmittableThreadLocal
解决在使用线程池等会缓存线程的组件情况下传递ThreadLocal
的问题呢?
浅浅的看一下哈~
我们先看看它针对我们使用的线程做了什么增强
public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) { if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) { return executorService; } return new ExecutorServiceTtlWrapper(executorService, true); }
在这里又看到new ExecutorServiceTtlWrapper(executorService, true)
返回~ 还得继续往下看
class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced { private final ExecutorService executorService; ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService, boolean idempotent) { super(executorService, idempotent); this.executorService = executorService; } //..... }
这就是一个构造函数,那么最后我们使用的线程池也就是这个增强后的ExecutorServiceTtlWrapper
了。它在这里也实现了ExecutorService
接口,那么肯定是实现了里面的所有方法。
我们直接跳到submit
方法,看看它做了什么操作,让它得以增强吧。
@NonNull @Override public <T> Future<T> submit(@NonNull Callable<T> task) { return executorService.submit(TtlCallable.get(task, false, idempotent)); } @NonNull @Override public <T> Future<T> submit(@NonNull Runnable task, T result) { return executorService.submit(TtlRunnable.get(task, false, idempotent), result); } @NonNull @Override public Future<?> submit(@NonNull Runnable task) { return executorService.submit(TtlRunnable.get(task, false, idempotent)); }
在这里能看到transmittable-thread-local
是对我们用到的Runnable、Callable
都进行包装增强。
这里我们只去看看TtlRunnable
,不对Callable
继续深入的谈啦。
虽然还没看
TtlRunnable
的代码,但是到这里我们也大致能猜到,transmittable-thread-local
对我们使用到的相关类都进行新的实现,并且兼容原本的方式。
TtlRunnable 代码
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments { private final AtomicReference<Object> capturedRef; private final Runnable runnable; // 运行后是否 释放 Ttl 值的引用 private final boolean releaseTtlValueReferenceAfterRun; private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { // capture() 这里具体调用的是 TransmittableThreadLocal下内部类Transmitter的capture()方法 // 捕获当前线程中的所有TransmittableThreadLocal和注册的ThreadLocal值。 // 之后会详细看到滴 this.capturedRef = new AtomicReference<>(capture()); this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; } @Override public void run() { //获取所有的ttl及tl快照内容 即获取到主线程传递下来的ThreadLocal的值。 final Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } //重放从capture()捕获的TransmittableThreadLocal和注册的ThreadLocal值,并在重放之前返回当前线程中的备份TransmittableThreadLocal值。 /** * 1. backup(备份)是子线程已经存在的ThreadLocal变量; * 2. 将captured的ThreadLocal值在子线程中set进去; */ final Object backup = replay(captured); try { //执行线程的任务 runnable.run(); } finally { //从replay(Object) / clear()恢复备份的TransmittableThreadLocal和注册的ThreadLocal值。 // 恢复线程执行replay方法之前的TTL值 restore(backup); } } //.... }
小结:
- 获取所有的ttl及tl快照内容
- 获取到捕获的TransmittableThreadLocal和注册的ThreadLocal值,并返回当前子线程所有存在的变量
- 执行线程任务
- 恢复线程到执行replay方法之前的TTL值
关于captured/replay/restore
这三个方法,我推荐你看作者回复的issue和文档:
没有特别理解 capture replay restore 这样的方式的好处? #145
经典的设计~ 值得搬个小板凳过去观摩
有想贴出capture replay restore
这三个的方法的,但是功底太浅,还是浅浅的写点文字吧~
为什么线程要恢复到执行replay方法之前的TTL值?
// 恢复线程执行replay方法之前的TTL值 restore(backup);
因为在子线程中可能会修改ThreadLocal的值,另外restore里面会主动调用remove()回收,避免内存泄露(会删除子线程新增的TTL)
有下列两种情况:
- 一种情况是:主线程启动了一个异步任务,此时主线程和子线程会并行,由于父子线程的数据是隔离开的,子线程此时对TTL中的内容进行修改并不会影响到原线程的逻辑
- 另一种情况是:线程池的拒绝策略为
CallerRunsPolicy
时,那么在主线程内启动这个异步任务可能会有当前的主线程来执行,那么线程之间的数据并不会隔离,那么如果对ThreadLocal中的数据进行了修改,那么将会影响到程序的正常运行。
想浅浅的摆个烂啦,后面的内容还有很多,大家感兴趣继续去肝
看相关博文的时候,都谈到了下面这段经典代码
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocalCode<Object>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocalCode<Object>, Object>(); } @Override protected WeakHashMap<TransmittableThreadLocalCode<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocalCode<Object>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocalCode<Object>, Object>(parentValue); } };
这里就是保存TTL的地方,它结合TTL中静态内部类 Transmitter,实现了线程池级别的缓存。
小小的小结一下:
- TTL通过增强
Runnable
,将原本位于new Thread()#init()
的变量副本的传递,推迟到线程任务执行的时候,即在run()
中,这样即使是使用线程池的线程,也能够在使用的时候将线程的变量副本继续传递下去。 - 第二点,通过
captured/replay/restor
捕获、重放和回放机制,避免了在高并发情况下,线程池在CallerRunsPolicy
拒绝策略下,启动的异步线程和主线程在同一线程内执行,因为子线程修改线程的变量副本从而导致业务数据混乱的问题。
还没完全弄懂的我,就不继续摆弄那点浅薄的知识啦~
不过在这里我倒是学到了WeakHashMap
,有让我感觉到学习JVM的用处,想去看JVM那本书啦,哈哈,学习Java真的是任重而道远啊(卷不动啦,想摆烂)
让人有收获的 issue
之前就谈到了,我是因为看到文章谈到了TTL,然后在TTL上又碰巧看到了感兴趣的 issue,看那些issue,我是越看越着迷,哈哈,也因此有了不少收获。
- 分布式追踪系统场景下,如何使用TTL #53
作者大大回复的特别认真,提出问题的大佬也非常强,讨论了很长的篇幅,推荐观摩
还有关于『分布式跟踪系统』可以了解一下Google
的Dapper
(介绍的论文:中文| 英文),推荐阅读,写的很好。 - TTL agent 与 其他agent的兼容性问题 #226
也很有意思,无论是作者还是提问者,还有其他参与的大佬,执行力都非常强 - Issue: 能否提供与LOG4J(2)中的MDC集成或增强@bwzhang2011
- Issue: slf4j MDCAdapter with multi-thread-context 支持@bwzhang2011
这两处可能需要了解一下Log4j或slf4j
中的MDC,不过也很实用,也很有讨论氛围,最后作者也给出了相关建议和相关集成的包,真的很赞
安利一波作者-李鼎,每个issue回复的都好认真,给出来的回复和建议都很有帮助。
注意项
TTL是存在线程安全问题的,因为默认都是引用类型拷贝,如果子线程修改了数据,主线程是可以感知到的。
总结图
也是看到了这里,那么最后再通过这张思维图作为这篇博客正文的的结束吧。
也试着回顾一下,看完这篇博客是否有收获吧,有的话也请给俺给个赞吧
后记
这个周末就跟它三好好杠完了,我觉得还是挺有意义的。
从ThreadLocal
到InheritableThreadLocal
再到TransmittableThreadLocal
,之前懂的不多,写了,测试了,动手了,然后再记录下这个过程后,对于它们也终于不再是之前那般懵懂啦。
参考文档
在了解和学习TransmittableThreadLocal
所拜读过的文章
讲透 ThreadLocal 和 InheritableThreadLocal
InheritableThreadLocal在全链路中的作用
TransmittableThreadLocal解决线程池变量传递以及原理解析
多线程篇-TransmittableThreadLocal解决池化复用线程的传值问题
通过transmittable-thread-local源码理解线程池线程本地变量传递的原理
Java字节码技术(二)字节码增强之ASM、JavaAssist、Agent、Instrumentation
从TransmittableThreadLocal使用前调研(源码分析)
以及 github上的 TransmittableThreadLocal 的 issue