从ThreadLocal谈到TransmittableThreadLocal,从使用到原理3

简介: 从ThreadLocal谈到TransmittableThreadLocal,从使用到原理

从ThreadLocal谈到TransmittableThreadLocal,从使用到原理2:https://developer.aliyun.com/article/1394837

TransmittableThreadLocal

TransmittableThreadLocal 是alibaba 开源的一个工具类,github地址,是用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展工具类。

回到上面的问题,我们使用 TransmittableThreadLocal 来改造一下上面的问题吧

添加相关依赖:

<!--https://github.com/alibaba/transmittable-thread-local/releases/tag/v2.14.2-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.14.2</version>
</dependency>

使用方式

使用方式一:使用transmittable-thread-localjar中的相关包装类,或者相关api对使用的相关类进行包装(此处包装两字只是我个人的形容,严格说起来是用了设计模式中的装饰器模式,还有也用到了模板方法模式)

装饰器模式:指在不改变现有对象结构的情况下,动态地给该对象增加一些职责(即增加其额外功能)的模式,它属于对象结构型模式

public class TransmittableThreadLocalDemo1 {
    /**
     * 业务线程池,service 中执行异步任务的线程池
     * 使用 TtlExecutors.getTtlExecutorService() 包装一下我们自己的线程池,这样才可以 使用 TransmittableThreadLocal 解决在使用线程池等会缓存线程的组件情况下传递ThreadLocal的问题
     */
    private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
    /**
     * 线程上下文环境,改为使用 TransmittableThreadLocal 来保存
     * 然后在这里提交一个异步任务,模拟在子线程(执行异步任务的线程)中,是否可以访问到刚设置的环境变量值。
     */
    private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();
    public static void main(String[] args) {
        // 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称,
        for (int i = 0; i < 10; i++) {
            // 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量
            new Thread(new ServiceThread(i)).start();
        }
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //关闭线程池
        businessExecutors.shutdown();
    }
    /**
     * 模拟Service业务代码
     */
    static class ServiceThread implements Runnable {
        private int i;
        public ServiceThread(int i) {
            this.i = i;
        }
        @Override
        public void run() {
            requestIdThreadLocal.set(i);
            System.out.println("执行service方法==>在"+Thread.currentThread().getName() + "中存储变量副本==>" + i);
            // 异步编程 CompletableFuture.runAsync()创建无返回值的简单异步任务,businessExecutors 表示线程池~
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                try {
                    // 模拟执行时间
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:"+requestIdThreadLocal.get());
            }, businessExecutors);
            requestIdThreadLocal.remove();
        }
    }
}

输出结果:

执行service方法==>在Thread-2中存储变量副本==>1
执行service方法==>在Thread-1中存储变量副本==>0
执行service方法==>在Thread-5中存储变量副本==>4
执行service方法==>在Thread-3中存储变量副本==>2
执行service方法==>在Thread-4中存储变量副本==>3
执行service方法==>在Thread-6中存储变量副本==>5
执行service方法==>在Thread-8中存储变量副本==>7
执行service方法==>在Thread-9中存储变量副本==>8
执行service方法==>在Thread-7中存储变量副本==>6
执行service方法==>在Thread-10中存储变量副本==>9
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:0
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:8
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:3
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:9
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:7
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:6
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:1
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:5
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:4
执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:2

都是唯一的,不再有重复的值

使用方式二:添加JVM的启动参数

idea 启动

image.png

-javaagent:path/transmittable-thread-local-xxxx.jar

使用-javaagent近似无侵入式的使用TransmittableThreadLocal.

public class TransmittableThreadLocalDemo2 {
    /**
     * 业务线程池,service 中执行异步任务的线程池
     */
    private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5);
    /**
     * 线程上下文环境,在service中设置环境变量,
     * 然后在这里提交一个异步任务,模拟在子线程(执行异步任务的线程)中,是否可以访问到刚设置的环境变量值。
     */
    private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();
    public static void main(String[] args) {
        // 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称,
        for (int i = 0; i < 10; i++) {
            // 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量
            new Thread(new ServiceThread(i)).start();
        }
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //关闭线程池
        businessExecutors.shutdown();
    }
    /**
     * 模拟Service业务代码
     */
    static class ServiceThread implements Runnable {
        private int i;
        public ServiceThread(int i) {
            this.i = i;
        }
        @Override
        public void run() {
            requestIdThreadLocal.set(i);
            System.out.println("执行service方法==>在"+Thread.currentThread().getName() + "中存储变量副本==>" + i);
            // 异步编程 CompletableFuture.runAsync()创建无返回值的简单异步任务,businessExecutors 表示线程池~
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                try {
                    // 模拟执行时间
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("执行异步任务,在执行异步任务的线程中,获取父线程(service)中存储的值:"+requestIdThreadLocal.get());
            }, businessExecutors);
            requestIdThreadLocal.remove();
        }
    }
}

你可以试着添加和不添加都试一下,就可以看出效果来啦。

jar包方式启动的话,也是一样,在启动参数上加上-javaagent:path/transmittable-thread-local-xxxx.jar即可。

注意:-javaagent:path/transmittable-thread-local-xxxx.jar要放在第一个启动参数中。

相关issue:TTL agent 与 其他agent的兼容性问题

javaagent不太熟悉,不过后面参考文档中有贴出来我好奇时看了的相关优质文章,大家感兴趣的话也可以看看,值得了解的。

浅浅的分析了点内容

在第一个案例中,使用 TransmittableThreadLocal 时,我们也使用了 TtlExecutors.getTtlExecutorService()对我们的线程池做了增强(这也是必须的搭配,否则没法使用 TransmittableThreadLocal 特性)

    private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));

那到底做了一些什么事情呢?然后就又可以让TransmittableThreadLocal解决在使用线程池等会缓存线程的组件情况下传递ThreadLocal的问题呢?

浅浅的看一下哈~

我们先看看它针对我们使用的线程做了什么增强

public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) {
    if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) {
        return executorService;
    }
    return new ExecutorServiceTtlWrapper(executorService, true);
}

在这里又看到new ExecutorServiceTtlWrapper(executorService, true)返回~ 还得继续往下看

class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced {
    private final ExecutorService executorService;
    ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService, boolean idempotent) {
        super(executorService, idempotent);
        this.executorService = executorService;
    }
    
    //.....
}

这就是一个构造函数,那么最后我们使用的线程池也就是这个增强后的ExecutorServiceTtlWrapper了。它在这里也实现了ExecutorService接口,那么肯定是实现了里面的所有方法。

我们直接跳到submit方法,看看它做了什么操作,让它得以增强吧。

@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
    return executorService.submit(TtlCallable.get(task, false, idempotent));
}
@NonNull
@Override
public <T> Future<T> submit(@NonNull Runnable task, T result) {
    return executorService.submit(TtlRunnable.get(task, false, idempotent), result);
}
@NonNull
@Override
public Future<?> submit(@NonNull Runnable task) {
    return executorService.submit(TtlRunnable.get(task, false, idempotent));
}

在这里能看到transmittable-thread-local是对我们用到的Runnable、Callable都进行包装增强。

这里我们只去看看TtlRunnable,不对Callable继续深入的谈啦。

虽然还没看TtlRunnable的代码,但是到这里我们也大致能猜到,transmittable-thread-local对我们使用到的相关类都进行新的实现,并且兼容原本的方式。

TtlRunnable 代码
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
    
    private final AtomicReference<Object> capturedRef;
    private final Runnable runnable;
    // 运行后是否 释放 Ttl 值的引用
    private final boolean releaseTtlValueReferenceAfterRun;
    private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        // capture() 这里具体调用的是 TransmittableThreadLocal下内部类Transmitter的capture()方法
        // 捕获当前线程中的所有TransmittableThreadLocal和注册的ThreadLocal值。
        // 之后会详细看到滴
        this.capturedRef = new AtomicReference<>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }
    @Override
    public void run() {
        //获取所有的ttl及tl快照内容 即获取到主线程传递下来的ThreadLocal的值。
        final Object captured = capturedRef.get();
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
        //重放从capture()捕获的TransmittableThreadLocal和注册的ThreadLocal值,并在重放之前返回当前线程中的备份TransmittableThreadLocal值。
        /**
         * 1.  backup(备份)是子线程已经存在的ThreadLocal变量;
         * 2. 将captured的ThreadLocal值在子线程中set进去;
         */
        final Object backup = replay(captured); 
        try {
            //执行线程的任务
            runnable.run();
        } finally {
            //从replay(Object) / clear()恢复备份的TransmittableThreadLocal和注册的ThreadLocal值。
            // 恢复线程执行replay方法之前的TTL值
            restore(backup);
        }
    }
    //.... 
}

小结:

  1. 获取所有的ttl及tl快照内容
  2. 获取到捕获的TransmittableThreadLocal和注册的ThreadLocal值,并返回当前子线程所有存在的变量
  3. 执行线程任务
  4. 恢复线程到执行replay方法之前的TTL值

关于captured/replay/restore 这三个方法,我推荐你看作者回复的issue和文档:

没有特别理解 capture replay restore 这样的方式的好处? #145

所有TTL值的抓取、回放和恢复方法(即CRR操作)

经典的设计~ 值得搬个小板凳过去观摩

有想贴出capture replay restore这三个的方法的,但是功底太浅,还是浅浅的写点文字吧~

为什么线程要恢复到执行replay方法之前的TTL值?

// 恢复线程执行replay方法之前的TTL值
restore(backup);

因为在子线程中可能会修改ThreadLocal的值,另外restore里面会主动调用remove()回收,避免内存泄露(会删除子线程新增的TTL)

有下列两种情况:

  1. 一种情况是:主线程启动了一个异步任务,此时主线程和子线程会并行,由于父子线程的数据是隔离开的,子线程此时对TTL中的内容进行修改并不会影响到原线程的逻辑
  2. 另一种情况是:线程池的拒绝策略为CallerRunsPolicy时,那么在主线程内启动这个异步任务可能会有当前的主线程来执行,那么线程之间的数据并不会隔离,那么如果对ThreadLocal中的数据进行了修改,那么将会影响到程序的正常运行。

想浅浅的摆个烂啦,后面的内容还有很多,大家感兴趣继续去肝

看相关博文的时候,都谈到了下面这段经典代码

private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
    new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
    @Override
    protected WeakHashMap<TransmittableThreadLocalCode<Object>, ?> initialValue() {
        return new WeakHashMap<TransmittableThreadLocalCode<Object>, Object>();
    }
    @Override
    protected WeakHashMap<TransmittableThreadLocalCode<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocalCode<Object>, ?> parentValue) {
        return new WeakHashMap<TransmittableThreadLocalCode<Object>, Object>(parentValue);
    }
};

这里就是保存TTL的地方,它结合TTL中静态内部类 Transmitter,实现了线程池级别的缓存。

小小的小结一下:

  1. TTL通过增强Runnable,将原本位于new Thread()#init()的变量副本的传递,推迟到线程任务执行的时候,即在run()中,这样即使是使用线程池的线程,也能够在使用的时候将线程的变量副本继续传递下去。
  2. 第二点,通过captured/replay/restor捕获、重放和回放机制,避免了在高并发情况下,线程池在CallerRunsPolicy拒绝策略下,启动的异步线程和主线程在同一线程内执行,因为子线程修改线程的变量副本从而导致业务数据混乱的问题。

还没完全弄懂的我,就不继续摆弄那点浅薄的知识啦~

不过在这里我倒是学到了WeakHashMap,有让我感觉到学习JVM的用处,想去看JVM那本书啦,哈哈,学习Java真的是任重而道远啊(卷不动啦,想摆烂)

让人有收获的 issue

之前就谈到了,我是因为看到文章谈到了TTL,然后在TTL上又碰巧看到了感兴趣的 issue,看那些issue,我是越看越着迷,哈哈,也因此有了不少收获。

  1. 分布式追踪系统场景下,如何使用TTL #53
    作者大大回复的特别认真,提出问题的大佬也非常强,讨论了很长的篇幅,推荐观摩
    还有关于『分布式跟踪系统』可以了解一下GoogleDapper(介绍的论文:中文| 英文),推荐阅读,写的很好。
  2. TTL agent 与 其他agent的兼容性问题 #226
    也很有意思,无论是作者还是提问者,还有其他参与的大佬,执行力都非常强
  3. Issue: 能否提供与LOG4J(2)中的MDC集成或增强@bwzhang2011
  4. Issue: slf4j MDCAdapter with multi-thread-context 支持@bwzhang2011
    这两处可能需要了解一下Log4j或slf4j 中的MDC,不过也很实用,也很有讨论氛围,最后作者也给出了相关建议和相关集成的包,真的很赞

安利一波作者-李鼎,每个issue回复的都好认真,给出来的回复和建议都很有帮助。

注意项

TTL是存在线程安全问题的,因为默认都是引用类型拷贝,如果子线程修改了数据,主线程是可以感知到的。

总结图

也是看到了这里,那么最后再通过这张思维图作为这篇博客正文的的结束吧。

image.png

也试着回顾一下,看完这篇博客是否有收获吧,有的话也请给俺给个赞吧

后记

这个周末就跟它三好好杠完了,我觉得还是挺有意义的。

ThreadLocalInheritableThreadLocal再到TransmittableThreadLocal,之前懂的不多,写了,测试了,动手了,然后再记录下这个过程后,对于它们也终于不再是之前那般懵懂啦。

参考文档

在了解和学习TransmittableThreadLocal所拜读过的文章

讲透 ThreadLocal 和 InheritableThreadLocal

InheritableThreadLocal在全链路中的作用

TransmittableThreadLocal原理解析

TransmittableThreadLocal解决线程池变量传递以及原理解析

多线程篇-TransmittableThreadLocal解决池化复用线程的传值问题

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

Java字节码技术(二)字节码增强之ASM、JavaAssist、Agent、Instrumentation

字节码增强技术探索-美团技术团队

从TransmittableThreadLocal使用前调研(源码分析)

搞定 WeakHashMap 的工作原理一篇文章就够了

以及 github上的 TransmittableThreadLocal 的 issue


目录
相关文章
|
19天前
|
存储 安全 Java
面试题:用过ThreadLocal吗?ThreadLocal是在哪个包下的?看过ThreadLocal源码吗?讲一下ThreadLocal的get和put是怎么实现的?
字节面试题:用过ThreadLocal吗?ThreadLocal是在哪个包下的?看过ThreadLocal源码吗?讲一下ThreadLocal的get和put是怎么实现的?
30 0
|
3月前
|
存储 安全 Java
ThreadLocal原理讲解
ThreadLocal原理讲解
21 0
|
4月前
|
存储 Java
从ThreadLocal谈到TransmittableThreadLocal,从使用到原理2
从ThreadLocal谈到TransmittableThreadLocal,从使用到原理
83 0
|
4月前
|
存储 前端开发 Java
从ThreadLocal谈到TransmittableThreadLocal,从使用到原理1
从ThreadLocal谈到TransmittableThreadLocal,从使用到原理
169 0
|
8月前
|
存储 Java
大厂是怎么用ThreadLocal?ThreadLocal核心原理分析
ThreadLocal**是Java中的一个线程本地变量类。它可以让每个线程都有自己独立的变量副本,而不会相互影响。
84 1
|
8月前
|
Java 数据库连接
ThreadLocal原理和实践
ThreadLocal是线程本地变量,解决多线程环境下成员变量共享存在的问题。ThreadLocal为每个线程创建独立的的变量副本,他的特性是该变量的引用对全局可见,但是其值只对当前线程可用,每个线程都将自己的值保存到这个变量中而各线程不受影响。
129 0
ThreadLocal原理和实践
|
9月前
|
存储 SpringCloudAlibaba Java
浅析ThreadLocal使用及实现原理
提供了线程局部 (thread-local) 变量。这些变量不同于它们的普通对应物,因为访问某个变量(通过其`get` 或 `set`方法)的每个线程都有自己的局部变量,它独立于变量的初始化副本。`ThreadLocal`实例通常是类中的 private static 字段,它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联 。所以ThreadLocal与线程同步机制不同,线程同步机制是多个线程共享同一个变量,而ThreadLocal是为每一个线程创建一个单独的变量副本,故而每个线程都可以独立地改变自己所拥有的变量副本,而不会影响其他线程所对应的副本。可以这么说Th
77 0
浅析ThreadLocal使用及实现原理
|
存储 设计模式 Java
ThreadLocal的短板,我 TransmittableThreadLocal 来补上!(上)
ThreadLocal的短板,我 TransmittableThreadLocal 来补上!(上)
ThreadLocal的短板,我 TransmittableThreadLocal 来补上!(上)
|
存储 安全 Java
|
存储 算法 安全
ThreadLocal原理剖析
ThreadLocal原理剖析
163 0