ThreadLocal、InheritThreadLocal、TransmittableThreadLocal

简介: ThreadLocal、InheritThreadLocal、TransmittableThreadLocal

一、ThreadLocal

多线程是Java实现多任务的基础,Thread对象代表一个线程,我们可以在代码中调用Thread.currentThread()获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字,

对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:

public void process(User user) {
    checkPermission();
    doWork();
    saveStatus();
    sendResponse();
}

然后,通过线程池去执行这些任务。

观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?

process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:

public void process(User user) {
    checkPermission(user);
    doWork(user);
    saveStatus(user);
    sendResponse(user);
}

但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:

void doWork(User user) {
    queryStatus(user);
    checkStatus();
    setNewStatus(user);
    log();
}

这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。

给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。

Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。

ThreadLocal实例通常总是以静态字段初始化如下:

static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();

它的典型使用方式如下:

void processUser(user) {
    try {
        threadLocalUser.set(user);
        step1();
        step2();
    } finally {
        threadLocalUser.remove();
    }
}

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

void step1() {
    User u = threadLocalUser.get();
    log();
    printUser();
}
 
void log() {
    User u = threadLocalUser.get();
    println(u.name);
}
 
void step2() {
    User u = threadLocalUser.get();
    checkUser(u.id);
}

注意到普通的方法调用一定是同一个线程执行的,所以,step1()step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。

实际上,可以把ThreadLocal看成一个全局Map<Thread, Object>:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:

Object threadLocalValue = threadLocalMap.get(Thread.currentThread());

因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。

最后,特别注意ThreadLocal一定要在finally中清除:

try {
    threadLocalUser.set(user);
    ...
} finally {
    threadLocalUser.remove();
}

这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。

为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {...}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:

public class UserContext implements AutoCloseable {
 
    static final ThreadLocal<String> ctx = new ThreadLocal<>();
 
    public UserContext(String user) {
        ctx.set(user);
    }
 
    public static String currentUser() {
        return ctx.get();
    }
 
    @Override
    public void close() {
        ctx.remove();
    }
}

使用的时候,我们借助try (resource) {...}结构,可以这么写:

try (var ctx = new UserContext("Bob")) {
    // 可任意调用UserContext.currentUser():
    String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象

这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {...}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名。

ThreadLocal只是线程传递,但是当在本线程中创建了一个新的线程,比如说又new了一个Thread的情况下,就不能把ThreadLocal中的数据传递给子线程。此时解决办法是通过InheritThreadLocal来解决

二、InheritThreadLocal

同一个ThreadLocal变量在父线程中被设置值后,在子线程中是获取不到的。而子类InheritableThreadLocal提供了一个特性,就是让子线程可以访问在父线程中设置的本地变量

当提交一个新任务到线程池时,线程池的处理流程如下:

  • 线程池判断线程数是否达到核心线程数且线程都处于工作状态。如果不是,则创建一个新的工作线程来执行任务。否则进入下个流程
  • 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。否则进入下个流程
  • 线程池判断线程数是否达到最大线程数且线程都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。否则按照策略处理无法执行的任务

如果使用InheritableThreadLocal+线程池,提交任务时导致线程池创建了新的工作线程,此时工作线程(子线程)能够访问到父线程(提交任务的线程)的本地变量;如果提交任务复用了已经创建的工作线程,此时工作线程(子线程)访问的本地变量来源于第一个提交任务给该工作线程的外部线程,造成线程本地变量混乱

 

public class InheritableThreadLocalDemo {
    /**
     * 模拟tomcat线程池
     */
    private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
 
    /**
     * 业务线程池,默认Control中异步任务执行线程池
     */
    private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5);
 
    /**
     * 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值
     */
    private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();
 
    /**
     * 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称,
     * 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量
     *
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; ++i) {
            tomcatExecutors.submit(new ControlThread(i));
        }
 
        TimeUnit.SECONDS.sleep(10);
        businessExecutors.shutdown();
        tomcatExecutors.shutdown();
    }
 
    /**
     * 模拟Control任务
     */
    static class ControlThread implements Runnable {
        private int i;
 
        public ControlThread(int i) {
            this.i = i;
        }
 
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":" + i);
            requestIdThreadLocal.set(i);
            //使用线程池异步处理任务
            businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
        }
    }
 
    /**
     * 业务任务,主要是模拟在Control控制层,提交任务到线程池执行
     */
    static class BusinessTask implements Runnable {
        private String parentThreadName;
 
        public BusinessTask(String parentThreadName) {
            this.parentThreadName = parentThreadName;
        }
 
        @Override
        public void run() {
            //如果与上面的能对应上来,则说明正确,否则失败
            System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
        }
    }
}

其中一次执行结果如下:

pool-1-thread-1:0

pool-1-thread-4:3

pool-1-thread-3:2

pool-1-thread-2:1

pool-1-thread-6:5

pool-1-thread-5:4

pool-1-thread-7:6

pool-1-thread-8:7

pool-1-thread-9:8

pool-1-thread-10:9

parentThreadName:pool-1-thread-1:0

parentThreadName:pool-1-thread-4:3

parentThreadName:pool-1-thread-8:3

parentThreadName:pool-1-thread-7:6

parentThreadName:pool-1-thread-10:0

parentThreadName:pool-1-thread-3:3

parentThreadName:pool-1-thread-5:4

parentThreadName:pool-1-thread-2:1

parentThreadName:pool-1-thread-9:6

parentThreadName:pool-1-thread-6:0

在子线程中出现出现了线程本地变量混乱的现象

InheritThreadLocal解决了创建新的子线程的传递问题。但是如果我们使用的并不是通过new Thread的办法异步创建线程,而是通过线程池来进行异步来解决。如果线程池新建线程的话,使用InheritThreadLocal可以保证数据的传递。但是线程池中的线程是重复使用的,当重复使用线程的时候,重复使用的线程中的InheritThreadLocal仍然是上次创建的数据。此时解决办法可以参考阿里的TransmittableThreadLocal

 

三、TransmittableThreadLocal

依赖

<dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>transmittable-thread-local</artifactId>
   <version>2.12.1</version>
</dependency>

使用

 

public class TransmittableThreadLocalDemo {
    /**
     * 模拟tomcat线程池
     */
    private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
 
    /**
     * 业务线程池,默认Control中异步任务执行线程池 使用ttl线程池
     */
    private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
 
    /**
     * 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值
     */
    private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();
 
    /**
     * 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称,
     * 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量
     *
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; ++i) {
            tomcatExecutors.submit(new ControlThread(i));
        }
 
        TimeUnit.SECONDS.sleep(10);
        businessExecutors.shutdown();
        tomcatExecutors.shutdown();
    }
 
    /**
     * 模拟Control任务
     */
    static class ControlThread implements Runnable {
        private int i;
 
        public ControlThread(int i) {
            this.i = i;
        }
 
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":" + i);
            requestIdThreadLocal.set(i);
            //使用线程池异步处理任务
            businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
        }
    }
 
    /**
     * 业务任务,主要是模拟在Control控制层,提交任务到线程池执行
     */
    static class BusinessTask implements Runnable {
        private String parentThreadName;
 
        public BusinessTask(String parentThreadName) {
            this.parentThreadName = parentThreadName;
        }
 
        @Override
        public void run() {
            //如果与上面的能对应上来,则说明正确,否则失败
            System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
        }
    }
}

执行结果符合预期结果

阿里的解决方式仍然是使用的InheritThreadLocal,不同的是,阿里通过Javaagent修改了线程池的字节码,在线程池创建Runnable或者Callable的时候进行了包装,我们就叫RunnableWrapper。把需要传递的数据在new RunnableWrapper的时候就传递到了RunnableWrapper的成员变量中。在RunnableWrapper执行run方法的时候,先将成员变量的数据重新放一遍ThreadLocal,然后再真正执行被包装的Runnable的run方法。这样在真正的run方法中就可以拿到ThreadLocal的数据

实际项目使用TransmittableThreadLocal时,需要对线程池进行封装才可以,比如常用的线程池创建方式:

1、ExecutorService ExecutorService service = Executors.newFixedThreadPool(5);

使用TransmittableThreadLocal,需要对创建的ExecutorService封装。

ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));

2、定义一个TtlThreadPoolTaskExecutor,继承ThreadPoolTaskExecutor,重写submit/execute方法,在初始化线程池时,使用TtlThreadPoolTaskExecutor即可

public class TtlThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
 
    @Override
    public void execute(Runnable command) {
        Runnable ttlRunnable = TtlRunnable.get(command);
        super.execute(ttlRunnable);
    }
 
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        Callable ttCallable = TtlCallable.get(task);
        return super.submit(ttCallable);
    }
 
    @Override
    public Future<?> submit(Runnable task) {
        Runnable ttlRunnable = TtlRunnable.get(task);
        return super.submit(ttlRunnable);
    }
 
    @Override
    public ListenableFuture<?> submitListenable(Runnable task){
        Runnable ttlRunnable = TtlRunnable.get(task);
        return super.submitListenable(ttlRunnable);
    }
 
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        Callable ttlCallable = TtlCallable.get(task);
        return super.submitListenable(ttlCallable);
    }
 
}

要使用的地方

ThreadPoolTaskExecutor threadPoolTaskExecutor = new TtlThreadPoolTaskExecutor();

spring容器中可以定义为bean全局使用

@Configuration
public class ThreadPoolConfig {
 
    @Value("${config.thread.pool.core_pool_size:20}")
    private Integer corePoolSize;
 
    @Value("${config.thread.pool.max_pool_size:100}")
    private Integer maxPoolSize;
 
    @Value("${config.thread.pool.keep_alive_seconds:2}")
    private Integer keepAliveSeconds;
 
    @Value("${config.thread.pool.queue_capacity:200}")
    private Integer queueCapacity;
 
    @Value("${config.thread.pool.allow_core_thread_timeout:true}")
    private String allowCoreThreadTimeOut;
 
    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new TtlThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(Boolean.parseBoolean(allowCoreThreadTimeOut));
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return threadPoolTaskExecutor;
    }
 
}

依赖注入使用

@Autowired

private ThreadPoolTaskExecutor threadPoolTaskExecutor;

threadPoolTaskExecutor.submit(() -> {

   //任务

});



相关文章
|
存储 Java 测试技术
【通用行业开发部】阿里开源TransmittableThreadLocal使用经验记录
本文章主要记录我在一次业务优化中,使用线程池带来的子父线程值传递问题,及使用TransmittableThreadLocal来解决该问题的经验,并对TransmittableThreadLocal原理做些梳理。
|
11月前
|
存储 Java 测试技术
一文彻底搞懂阿里开源TransmittableThreadLocal的原理和使用
【10月更文挑战第2天】在Java多线程编程中,线程本地变量(ThreadLocal)是一个非常有用的工具,它能够在每个线程中保存一个独立的变量副本,从而避免多线程环境下的数据竞争问题。然而,在使用线程池等高级多线程技术时,ThreadLocal却面临着一些挑战。为了解决这个问题,阿里巴巴开源了TransmittableThreadLocal(TTL),它扩展了ThreadLocal的功能,使其能够在复杂的多线程环境中正确传递值。本文将深入探讨TTL的原理和使用,帮助读者彻底理解这一技术干货。
1690 0
|
消息中间件 存储 RocketMQ
Rocketmq如何保证消息不丢失
文章分析了RocketMQ如何通过生产者端的同步发送与重试机制、Broker端的持久化存储与消息重试投递策略、以及消费者端的手动提交ack与幂等性处理,来确保消息在整个传输和消费过程中的不丢失。
|
3月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2061 8
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
5月前
|
存储 安全 Java
ThreadLocal - 原理与应用场景详解
ThreadLocal是Java中用于实现线程隔离的重要工具,为每个线程提供独立的变量副本,避免多线程数据共享带来的安全问题。其核心原理是通过 ThreadLocalMap 实现键值对存储,每个线程维护自己的存储空间。ThreadLocal 广泛应用于线程隔离、跨层数据传递、复杂调用链路的全局参数传递及数据库连接管理等场景。此外,InheritableThreadLocal 支持子线程继承父线程的变量值,而 TransmittableThreadLocal 则解决了线程池中变量传递的问题,提升了多线程上下文管理的可靠性。深入理解这些机制,有助于开发者更好地解决多线程环境下的数据隔离与共享挑战。
1100 43
|
存储 自然语言处理 关系型数据库
Elasticsearch 查询时 term、match、match_phrase、match_phrase_prefix 的区别
【7月更文挑战第3天】Elasticsearch 查询时 term、match、match_phrase、match_phrase_prefix 的区别
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
存储 监控 数据库
什么是聚集索引和非聚集索引?
【8月更文挑战第3天】
7105 6
|
SQL 算法 Java
(二十六)MySQL分库篇:Sharding-Sphere分库分表框架的保姆级教学!
前面《MySQL主从原理篇》、《MySQL主从实践篇》两章中聊明白了MySQL主备读写分离、多主多写热备等方案,但如果这些高可用架构依旧无法满足业务规模,或业务增长的需要,此时就需要考虑选用分库分表架构。
5197 4
|
消息中间件 Java Kafka
springboot整合kafka消费者最佳实践
springboot整合kafka消费者最佳实践
1427 1