ThreadLocal垮线程池传递数据解决方案:TransmittableThreadLocal【享学Java】(上)

简介: ThreadLocal垮线程池传递数据解决方案:TransmittableThreadLocal【享学Java】(上)

前言


在 上篇文章 了解到了,ThreadLocal它并不能解决线程安全问题,它旨在用于传递数据。但是它能成功传递数据比如有个大前提:放数据和取数据的操作必须是处于相同线程。


即使JDK扩展出了一个子类:InheritableThreadLocal,它能够支持跨线程传递数据,但也仅限于父线程给子线程来传递数据。倘若两个线程间真的八竿子打不着,比如分别位于两个线程池内的线程,它们之间要传递数据该肿么办呢?这就是跨线程池之间的数据传递范畴,是本文将要讲解的主要内容。


正文


在实际生产中,线程一般不可能孤立的独立去运行,而是交给线程池去调度处理。所以实际上几乎没有纯正的父子线程的关系存在,而若有这种需求大多是线程池与线程池之间的线程联系。

InheritableThreadLocal的局限性


上篇文章 介绍了ThreadLocal的局限性,可以使用更强的子类InheritableThreadLocal予以解决。那么这里看看如下示例:


public class TestThreadLocal {
    private static final ThreadLocal<Person> THREAD_LOCAL = new InheritableThreadLocal<>();
    private static final ExecutorService THREAD_POOL = Executors.newSingleThreadExecutor();
    @Test
    public void fun1() throws InterruptedException {
        THREAD_LOCAL.set(new Person());
        THREAD_POOL.execute(() -> getAndPrintData());
        TimeUnit.SECONDS.sleep(2);
        Person newPerson = new Person();
        newPerson.setAge(100);
        THREAD_LOCAL.set(newPerson); // 给线程重新绑定值
        THREAD_POOL.execute(() -> getAndPrintData());
        TimeUnit.SECONDS.sleep(2);
    }
    private void setData(Person person) {
        System.out.println("set数据,线程名:" + Thread.currentThread().getName());
        THREAD_LOCAL.set(person);
    }
    private Person getAndPrintData() {
        Person person = THREAD_LOCAL.get();
        System.out.println("get数据,线程名:" + Thread.currentThread().getName() + ",数据为:" + person);
        return person;
    }
    @Setter
    @ToString
    private static class Person {
        private Integer age = 18;
    }
}


运行程序,控制台打印:

get数据,线程名:pool-1-thread-1,数据为:TestThreadLocal.Person(age=18)
get数据,线程名:pool-1-thread-1,数据为:TestThreadLocal.Person(age=18)


重新绑定竟然“未生效”?在原基础上什么都不动,仅仅只改变线程池的大小:

private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(2);

再次运行程序,控制台打印:

get数据,线程名:pool-1-thread-1,数据为:TestThreadLocal.Person(age=18)
get数据,线程名:pool-1-thread-2,数据为:TestThreadLocal.Person(age=100)



这个结果能接受且符合预期。可以看到线程名是不一样的,所以第二个线程获取到了最新绑定的结果。因此可以大胆猜测:线程在init初始化的时候,才会去同步一份最新数据过来。


对于这两个示例的结果可做如下解释:


  • 示例1的线程池大小是1,所以第二个线程执行时复用的是上个线程(你看线程名称都一样),所以就不会再经历init初始化阶段,所以得到的绑定数据还是旧数据
  • 示例2的线程池大小是2,所以第二个线程执行时会继续初始化一条新的线程来执行它,会触发到init过程,所以它获取到的是最新绑定的数据。


小提示:线程池内线程数量若还没达到coreSize大小的话,每次新任务都会启用新的线程来执行的(不管是否有空闲线程与否)


Thread#init方法探究

为了理解后面方案的实现,非常有必要对线程初始化方法Thread#init理解一番。


Thread#init:
  // inheritThreadLocals是否继承线程的本地变量们(默认是true)
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        ...
    Thread parent = currentThread();
    ...
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    ...
        /* Set thread ID */ // 给线程一个自增的id
        tid = nextThreadID();
  }


子线程是通过在父线程中通过调用new Thread()方法来创建子线程,Thread#init方法在Thread的构造方法中被调用。


从摘录出来的源码出能得到如下重点:


  1. 当前线程作为新创建线程(子线程)的父线程
  2. 如果父线程绑定了变量(inheritableThreadLocals != null)并且允许继承(inheritThreadLocals = true),那么就会把父线程绑定的变量们 拷贝一份到子线程里1.拷贝的原理类似于Map复制,只不过其在Hash冲突时,不是使用链表结构,而是直接在数组中找下一个为null的槽位放里面


说明:这里的拷贝是浅拷贝:引用传递而已。如果想要深度拷贝,需要自行复写ThreadLocal#childValue()方法(比如你可以继承InheritableThreadLocal并重写childValue方法)


那么为何ThreadLocal不具备继承性,而InheritableThreadLocal可以呢?有了上面的知识储备,现在一探其源码便知:


public class InheritableThreadLocal<T> extends ThreadLocal<T> {
  // 现在知道为何是浅拷贝了吧~~~~~~
    protected T childValue(T parentValue) {
        return parentValue;
    }
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }
    // 只要inheritableThreadLocals不为null了,那可不就完成子线程可以继承父的了吗
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}


源码不会骗人,一切都透露得明明白白的了吧。


InheritableThreadLocal支持子线程访问父线程中本地变量的原理是:创建子线程时将父线程中的本地变量值拷贝了一份到自己这来,拷贝的时机是子线程创建时。


然后在实际开发中,多线程就离不开线程池的使用,因为线程池能够复用线程,减少线程的频繁创建与销毁。倘若合格时候使用InheritableThreadLocal来传递数据,那么线程池中的线程拷贝的数据始终来自于第一个提交任务的外部线程,这样非常容易造成线程本地变量混乱,这种错误是致命的,比如示例1就是这种例子~


那么,这种问题怎么破?JDK并没有提供源生的支持,这时候就得借助阿里巴巴开源的TTL(transmittable-thread-local):TransmittableThreadLocal。



相关文章
|
2天前
|
SQL Java 数据库连接
【潜意识Java】深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
11 1
|
7天前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
34 7
|
16天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
74 17
|
27天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
20天前
|
存储 Java BI
java怎么统计每个项目下的每个类别的数据
通过本文,我们详细介绍了如何在Java中统计每个项目下的每个类别的数据,包括数据模型设计、数据存储和统计方法。通过定义 `Category`和 `Project`类,并使用 `ProjectManager`类进行管理,可以轻松实现项目和类别的数据统计。希望本文能够帮助您理解和实现类似的统计需求。
68 17
|
12天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
27天前
|
JSON 前端开发 Java
【Bug合集】——Java大小写引起传参失败,获取值为null的解决方案
类中成员变量命名问题引起传送json字符串,但是变量为null的情况做出解释,@Data注解(Spring自动生成的get和set方法)和@JsonProperty
|
2天前
|
JSON 前端开发 安全
【潜意识java】前后端跨域问题及解决方案
本文深入探讨了跨域问题及其解决方案。跨域是指浏览器出于安全考虑,限制从一个域加载的网页请求另一个域的资源。
17 0
|
5月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
4月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。

热门文章

最新文章