通过transmittable-thread-local源码理解线程池线程本地变量传递的原理(上)

简介: 想起很早之前写过ThreadLocal的源码分析相关文章,里面提到了ThreadLocal存在一个不能向预先创建的线程中进行变量传递的局限性,刚好有一位HSBC的技术大牛前同事提到了团队引入了transmittable-thread-local解决了此问题。借着这个契机,顺便clone了transmittable-thread-local源码进行分析,这篇文章会把ThreadLocal和InheritableThreadLocal的局限性分析完毕,并且从一些基本原理以及设计模式的运用分析transmittable-thread-local(下文简称为TTL)整套框架的实现。

前提



最近一两个月花了很大的功夫做UCloud服务和中间件迁移到阿里云的工作,没什么空闲时间撸文。想起很早之前写过ThreadLocal的源码分析相关文章,里面提到了ThreadLocal存在一个不能向预先创建的线程中进行变量传递的局限性,刚好有一位HSBC的技术大牛前同事提到了团队引入了transmittable-thread-local解决了此问题。借着这个契机,顺便clonetransmittable-thread-local源码进行分析,这篇文章会把ThreadLocalInheritableThreadLocal的局限性分析完毕,并且从一些基本原理以及设计模式的运用分析transmittable-thread-local(下文简称为TTL)整套框架的实现。


这篇文章前后花了两周时间编写,行文比价干硬,文字比较多(接近5W字),希望带着耐心阅读。


父子线程的变量传递



Java中没有明确给出一个API可以基于子线程实例获取其父线程实例,有一个相对可行的方案就是在创建子线程Thread实例的时候获取当前线程的实例,用到的APIThread#currentThread()


public class Thread implements Runnable {
    // 省略其他代码
    @HotSpotIntrinsicCandidate
    public static native Thread currentThread();
    // 省略其他代码
}
复制代码


Thread#currentThread()方法是一个静态本地方法,它是由JVM实现,这是在JDK中唯一可以获取父线程实例的API。一般而言,如果想在子线程实例中得到它的父线程实例,那么需要像如下这样操作:


public class InheritableThread {
    public static void main(String[] args) throws Exception{
        // 父线程就是main线程
        Thread parentThread = Thread.currentThread();
        Thread childThread = new Thread(()-> {
            System.out.println("Parent thread is:" + parentThread.getName());
        },"childThread");
        childThread.start();
        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }
}
// 输出结果:
Parent thread is:main
复制代码


类似地,如果我们想把一个父子线程共享的变量实例传递,也可以这样做:


public class InheritableVars {
    public static void main(String[] args) throws Exception {
        // 父线程就是main线程
        Thread parentThread = Thread.currentThread();
        final Var var = new Var();
        var.setValue1("var1");
        var.setValue2("var2");
        Thread childThread = new Thread(() -> {
            System.out.println("Parent thread is:" + parentThread.getName());
            methodFrame1(var);
        }, "childThread");
        childThread.start();
        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }
    private static void methodFrame1(Var var) {
        methodFrame2(var);
    }
    private static void methodFrame2(Var var) {
    }
    @Data
    private static class Var {
        private Object value1;
        private Object value2;
    }
}
复制代码


这种做法其实是可行的,子线程调用的方法栈中的所有方法都必须显示传入需要从父线程传递过来的参数引用Var实例,这样就会产生硬编码问题,既不灵活也导致方法不能复用,所以才衍生出线程本地变量Thread Local,具体的实现有ThreadLocalInheritableThreadLocal。它们两者的基本原理是类似的,实际上所有的变量实例是缓存在线程实例的变量ThreadLocal.ThreadLocalMap中,线程本地变量实例都只是线程实例获取ThreadLocal.ThreadLocalMap的一道桥梁:


public class Thread implements Runnable {
    // 省略其他代码
    // KEY为ThreadLocal实例,VALUE为具体的值
    ThreadLocal.ThreadLocalMap threadLocals = null;
    // KEY为InheritableThreadLocal实例,VALUE为具体的值
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    // 省略其他代码
}
复制代码


ThreadLocalInheritableThreadLocal之间的区别可以结合源码分析一下(见下一小节)。前面的分析听起来如果觉得抽象的话,可以自己写几个类推敲一下,假如线程其实叫ThrowableThread,而线程本地变量叫ThrowableThreadLocal,那么它们之间的关系如下:


public class Actor {
    static ThrowableThreadLocal THREAD_LOCAL = new ThrowableThreadLocal();
    public static void main(String[] args) throws Exception {
        ThrowableThread throwableThread = new ThrowableThread() {
            @Override
            public void run() {
                methodFrame1();
            }
        };
        throwableThread.start();
    }
    private static void methodFrame1() {
        THREAD_LOCAL.set("throwable");
        methodFrame2();
    }
    private static void methodFrame2() {
        System.out.println(THREAD_LOCAL.get());
    }
    /**
     * 这个类暂且认为是java.lang.Thread
     */
    private static class ThrowableThread implements Runnable {
        ThrowableThreadLocal.ThrowableThreadLocalMap threadLocalMap;
        @Override
        public void run() {
        }
        // 这里模拟VM的实现,返回ThrowableThread自身,大家先认为不是返回NULL
        public static ThrowableThread getCurrentThread() {
//            return new ThrowableThread();
            return null;   // <--- 假设这里在VM的实现里面返回的不是NULL而是当前的ThrowableThread
        }
        public void start() {
            run();
        }
    }
    private static class ThrowableThreadLocal {
        public ThrowableThreadLocal() {
        }
        public void set(Object value) {
            ThrowableThread currentThread = ThrowableThread.getCurrentThread();
            assert null != currentThread;
            ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
            if (null == threadLocalMap) {
                threadLocalMap = currentThread.threadLocalMap = new ThrowableThreadLocalMap();
            }
            threadLocalMap.put(this, value);
        }
        public Object get() {
            ThrowableThread currentThread = ThrowableThread.getCurrentThread();
            assert null != currentThread;
            ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
            if (null == threadLocalMap) {
                return null;
            }
            return threadLocalMap.get(this);
        }
        // 这里其实在ThreadLocal中用的是WeakHashMap
        public static class ThrowableThreadLocalMap extends HashMap<ThrowableThreadLocal, Object> {
        }
    }
}
复制代码


上面的代码不能运行,只是通过一个自定义的实现说明一下其中的原理和关系。


ThreadLocal和InheritableThreadLocal的局限性



InheritableThreadLocalThreadLocal的子类,它们之间的联系是:两者都是线程Thread实例获取ThreadLocal.ThreadLocalMap的一个中间变量。区别是:两者控制ThreadLocal.ThreadLocalMap创建的时机和通过Thread实例获取ThreadLocal.ThreadLocalMapThread实例中对应的属性并不一样,导致两者的功能有一点差别。通俗来说两者的功能联系和区别是:


  • ThreadLocal:单个线程生命周期强绑定,只能在某个线程的生命周期内对ThreadLocal进行存取,不能跨线程存取。


public class ThreadLocalMain {
    private static ThreadLocal<String> TL = new ThreadLocal<>();
    public static void main(String[] args) throws Exception {
        new Thread(() -> {
            methodFrame1();
        }, "childThread").start();
        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }
    private static void methodFrame1() {
        TL.set("throwable");
        methodFrame2();
    }
    private static void methodFrame2() {
        System.out.println(TL.get());
    }
}
// 输出结果:
throwable
复制代码


  • InheritableThreadLocal:(1)可以无感知替代ThreadLocal的功能,当成ThreadLocal使用。(2)明确父-子线程关系的前提下,继承(拷贝)父线程的线程本地变量缓存过的变量,而这个拷贝的时机是子线程Thread实例化时候进行的,也就是子线程实例化完毕后已经完成了InheritableThreadLocal变量的拷贝,这是一个变量传递的过程。


public class InheritableThreadLocalMain {
    // 此处可以尝试替换为ThreadLocal,最后会输出null
    static InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
    public static void main(String[] args) throws Exception {
        new Thread(() -> {
            // 在父线程中设置变量
            ITL.set("throwable");
            new Thread(() -> {
                methodFrame1();
            }, "childThread").start();
        }, "parentThread").start();
        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }
    private static void methodFrame1() {
        methodFrame2();
    }
    private static void methodFrame2() {
        System.out.println(ITL.get());
    }
}
// 输出结果:
throwable
复制代码


上面提到的两点可以具体参看ThreadLocalInheritableThreadLocalThread三个类的源码,这里笔者把一些必要的注释和源码段贴出:


// --> java.lang.Thread类的源码片段
public class Thread implements Runnable {
    // 省略其他代码 
    // 这是Thread最基本的构造函数
    private Thread(ThreadGroup g, Runnable target, String name,
                   long stackSize, AccessControlContext acc,
                   boolean inheritThreadLocals) {
        // 省略其他代码
        Thread parent = currentThread();
        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        // inheritThreadLocals一般情况下为true
        // 当前子线程实例拷贝父线程的inheritableThreadLocals属性,创建一个新的ThreadLocal.ThreadLocalMap实例赋值到自身的inheritableThreadLocals属性
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        this.stackSize = stackSize;
        this.tid = nextThreadID();
    }
    // 省略其他代码
}
// --> java.lang.ThreadLocal源码片段
public class ThreadLocal<T> {
    // 省略其他代码 
    public void set(T value) {
        Thread t = Thread.currentThread();
        // 通过当前线程获取线程实例中的threadLocals
        ThreadLocalMap map = getMap(t);
        // 线程实例中的threadLocals为NULL,实例则创建一个ThreadLocal.ThreadLocalMap实例添加当前ThreadLocal->VALUE到ThreadLocalMap中,如果已经存在ThreadLocalMap则进行覆盖对应的Entry
        if (map != null) {
            map.set(this, value);
        } else {
            createMap(t, value);
        }
    }
    // 通过线程实例获取该线程的threadLocals实例,其实是ThreadLocal.ThreadLocalMap类型的属性
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
    public T get() {
        Thread t = Thread.currentThread();
        // 通过当前线程获取线程实例中的threadLocals,再获取ThreadLocal.ThreadLocalMap中匹配上KEY为当前ThreadLocal实例的Entry对应的VALUE
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 找不到则尝试初始化ThreadLocal.ThreadLocalMap
        return setInitialValue();
    }
    // 如果不存在ThreadLocal.ThreadLocalMap,则通过初始化initialValue()方法的返回值,构造一个ThreadLocal.ThreadLocalMap
    private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }
    // 省略其他代码 
}
// --> java.lang.InheritableThreadLocal源码 - 太简单,全量贴出
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    // 这个方法使用在线程Thread的构造函数里面ThreadLocal.createInheritedMap(),基于父线程InheritableThreadLocal的属性创建子线程的InheritableThreadLocal属性,它的返回值决定了拷贝父线程的属性时候传入子线程的值
    protected T childValue(T parentValue) {
        return parentValue;
    }
    // 覆盖获取线程实例中的绑定的ThreadLocalMap为Thread#inheritableThreadLocals,这个方法其实是覆盖了ThreadLocal中对应的方法,应该加@Override注解
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }
    // 覆盖创建ThreadLocalMap的逻辑,赋值到线程实例中的inheritableThreadLocals,而不是threadLocals,这个方法其实是覆盖了ThreadLocal中对应的方法,应该加@Override注解
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}
复制代码


一定要注意,这里的setInitialValue()方法很重要,一个新的线程Thread实例在初始化(对于InheritableThreadLocal而言继承父线程的线程本地变量)或者是首次调用ThreadLocal#set(),会通过此setInitialValue()方法去构造一个全新的ThreadLocal.ThreadLocalMap,会直接使用createMap()方法。


以前面提到的两个例子,贴一个图加深理解:


Example-1


微信截图_20220512215519.png


Example-2


微信截图_20220512215528.png


ThreadLocalInheritableThreadLocal的最大局限性就是:无法为预先创建好(未投入使用)的线程实例传递变量(准确来说是首次传递某些场景是可行的,而后面由于线程池中的线程是复用的,无法进行更新或者修改变量的传递值),泛线程池Executor体系、TimerTaskForkJoinPool等一般会预先创建(核心)线程,也就它们都是无法在线程池中由预创建的子线程执行的Runnable任务实例中使用。例如下面的方式会导致参数传递失败:


public class InheritableThreadForExecutor {
    static final InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
    static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
    public static void main(String[] args) throws Exception {
        ITL.set("throwable");
        EXECUTOR.execute(() -> {
            System.out.println(ITL.get());
        });
        ITL.set("doge");
        EXECUTOR.execute(() -> {
            System.out.println(ITL.get());
        });
        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }
}
// 输出结果:
throwable
throwable   # <--- 可见此处参数传递出现异常
复制代码


首次变量传递成功是因为线程池中的所有子线程都是派生自main线程。


TTL的简单使用



TTL的使用方式在它的项目README.md或者项目中的单元测试有十分详细的介绍,先引入依赖com.alibaba:transmittable-thread-local:2.11.4,这里演示一个例子:


// 父-子线程
public class TtlSample1 {
    static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
    public static void main(String[] args) throws Exception {
        new Thread(() -> {
            // 在父线程中设置变量
            TTL.set("throwable");
            new Thread(TtlRunnable.get(() -> {
                methodFrame1();
            }), "childThread").start();
        }, "parentThread").start();
        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }
    private static void methodFrame1() {
        methodFrame2();
    }
    private static void methodFrame2() {
        System.out.println(TTL.get());
    }
}
// 输出:
throwable
// 线程池
public class TtlSample2 {
    static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
    static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
    public static void main(String[] args) throws Exception {
        TTL.set("throwable");
        EXECUTOR.execute(TtlRunnable.get(() -> {
            System.out.println(TTL.get());
        }));
        TTL.set("doge");
        EXECUTOR.execute(TtlRunnable.get(() -> {
            System.out.println(TTL.get());
        }));
        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }
}
// 输出:
throwable
doge
复制代码


相关文章
|
10天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
125 60
【Java并发】【线程池】带你从0-1入门线程池
|
2月前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
6天前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
53 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
26天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
1月前
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
2月前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
48 6
|
3月前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
26天前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
45 17
|
1月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
57 26
|
3月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
272 2

相关实验场景

更多