一、ThreadLocal
多线程是Java实现多任务的基础,Thread
对象代表一个线程,我们可以在代码中调用Thread.currentThread()
获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字,
对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:
public void process(User user) { checkPermission(); doWork(); saveStatus(); sendResponse(); }
然后,通过线程池去执行这些任务。
观察process()
方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?
process()
方法需要传递的状态就是User
实例。有的童鞋会想,简单地传入User
就可以了:
public void process(User user) { checkPermission(user); doWork(user); saveStatus(user); sendResponse(user); }
但是往往一个方法又会调用其他很多方法,这样会导致User
传递到所有地方:
void doWork(User user) { queryStatus(user); checkStatus(); setNewStatus(user); log(); }
这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。
给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User
对象就传不进去了。
Java标准库提供了一个特殊的ThreadLocal
,它可以在一个线程中传递同一个对象。
ThreadLocal
实例通常总是以静态字段初始化如下:
static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
它的典型使用方式如下:
void processUser(user) { try { threadLocalUser.set(user); step1(); step2(); } finally { threadLocalUser.remove(); } }
通过设置一个User
实例关联到ThreadLocal
中,在移除之前,所有方法都可以随时获取到该User
实例:
void step1() { User u = threadLocalUser.get(); log(); printUser(); } void log() { User u = threadLocalUser.get(); println(u.name); } void step2() { User u = threadLocalUser.get(); checkUser(u.id); }
注意到普通的方法调用一定是同一个线程执行的,所以,step1()
、step2()
以及log()
方法内,threadLocalUser.get()
获取的User
对象是同一个实例。
实际上,可以把ThreadLocal
看成一个全局Map<Thread, Object>
:每个线程获取ThreadLocal
变量时,总是使用Thread
自身作为key:
Object threadLocalValue = threadLocalMap.get(Thread.currentThread());
因此,ThreadLocal
相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal
关联的实例互不干扰。
最后,特别注意ThreadLocal
一定要在finally
中清除:
try { threadLocalUser.set(user); ... } finally { threadLocalUser.remove(); }
这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal
没有被清除,该线程执行其他代码时,会把上一次的状态带进去。
为了保证能释放ThreadLocal
关联的实例,我们可以通过AutoCloseable
接口配合try (resource) {...}
结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal
可以封装为一个UserContext
对象:
public class UserContext implements AutoCloseable { static final ThreadLocal<String> ctx = new ThreadLocal<>(); public UserContext(String user) { ctx.set(user); } public static String currentUser() { return ctx.get(); } @Override public void close() { ctx.remove(); } }
使用的时候,我们借助try (resource) {...}
结构,可以这么写:
try (var ctx = new UserContext("Bob")) { // 可任意调用UserContext.currentUser(): String currentUser = UserContext.currentUser(); } // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象
这样就在UserContext
中完全封装了ThreadLocal
,外部代码在try (resource) {...}
内部可以随时调用UserContext.currentUser()
获取当前线程绑定的用户名。
ThreadLocal只是线程传递,但是当在本线程中创建了一个新的线程,比如说又new了一个Thread的情况下,就不能把ThreadLocal中的数据传递给子线程。此时解决办法是通过InheritThreadLocal来解决
二、InheritThreadLocal
同一个ThreadLocal变量在父线程中被设置值后,在子线程中是获取不到的。而子类InheritableThreadLocal提供了一个特性,就是让子线程可以访问在父线程中设置的本地变量
当提交一个新任务到线程池时,线程池的处理流程如下:
- 线程池判断线程数是否达到核心线程数且线程都处于工作状态。如果不是,则创建一个新的工作线程来执行任务。否则进入下个流程
- 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。否则进入下个流程
- 线程池判断线程数是否达到最大线程数且线程都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。否则按照策略处理无法执行的任务
如果使用InheritableThreadLocal+线程池,提交任务时导致线程池创建了新的工作线程,此时工作线程(子线程)能够访问到父线程(提交任务的线程)的本地变量;如果提交任务复用了已经创建的工作线程,此时工作线程(子线程)访问的本地变量来源于第一个提交任务给该工作线程的外部线程,造成线程本地变量混乱
public class InheritableThreadLocalDemo { /** * 模拟tomcat线程池 */ private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10); /** * 业务线程池,默认Control中异步任务执行线程池 */ private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5); /** * 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值 */ private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>(); /** * 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称, * 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量 * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; ++i) { tomcatExecutors.submit(new ControlThread(i)); } TimeUnit.SECONDS.sleep(10); businessExecutors.shutdown(); tomcatExecutors.shutdown(); } /** * 模拟Control任务 */ static class ControlThread implements Runnable { private int i; public ControlThread(int i) { this.i = i; } @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + i); requestIdThreadLocal.set(i); //使用线程池异步处理任务 businessExecutors.submit(new BusinessTask(Thread.currentThread().getName())); } } /** * 业务任务,主要是模拟在Control控制层,提交任务到线程池执行 */ static class BusinessTask implements Runnable { private String parentThreadName; public BusinessTask(String parentThreadName) { this.parentThreadName = parentThreadName; } @Override public void run() { //如果与上面的能对应上来,则说明正确,否则失败 System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get()); } } }
其中一次执行结果如下:
pool-1-thread-1:0
pool-1-thread-4:3
pool-1-thread-3:2
pool-1-thread-2:1
pool-1-thread-6:5
pool-1-thread-5:4
pool-1-thread-7:6
pool-1-thread-8:7
pool-1-thread-9:8
pool-1-thread-10:9
parentThreadName:pool-1-thread-1:0
parentThreadName:pool-1-thread-4:3
parentThreadName:pool-1-thread-8:3
parentThreadName:pool-1-thread-7:6
parentThreadName:pool-1-thread-10:0
parentThreadName:pool-1-thread-3:3
parentThreadName:pool-1-thread-5:4
parentThreadName:pool-1-thread-2:1
parentThreadName:pool-1-thread-9:6
parentThreadName:pool-1-thread-6:0
在子线程中出现出现了线程本地变量混乱的现象
InheritThreadLocal解决了创建新的子线程的传递问题。但是如果我们使用的并不是通过new Thread的办法异步创建线程,而是通过线程池来进行异步来解决。如果线程池新建线程的话,使用InheritThreadLocal可以保证数据的传递。但是线程池中的线程是重复使用的,当重复使用线程的时候,重复使用的线程中的InheritThreadLocal仍然是上次创建的数据。此时解决办法可以参考阿里的TransmittableThreadLocal
三、TransmittableThreadLocal
依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.12.1</version> </dependency>
使用
public class TransmittableThreadLocalDemo { /** * 模拟tomcat线程池 */ private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10); /** * 业务线程池,默认Control中异步任务执行线程池 使用ttl线程池 */ private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5)); /** * 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值 */ private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>(); /** * 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称, * 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量 * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; ++i) { tomcatExecutors.submit(new ControlThread(i)); } TimeUnit.SECONDS.sleep(10); businessExecutors.shutdown(); tomcatExecutors.shutdown(); } /** * 模拟Control任务 */ static class ControlThread implements Runnable { private int i; public ControlThread(int i) { this.i = i; } @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + i); requestIdThreadLocal.set(i); //使用线程池异步处理任务 businessExecutors.submit(new BusinessTask(Thread.currentThread().getName())); } } /** * 业务任务,主要是模拟在Control控制层,提交任务到线程池执行 */ static class BusinessTask implements Runnable { private String parentThreadName; public BusinessTask(String parentThreadName) { this.parentThreadName = parentThreadName; } @Override public void run() { //如果与上面的能对应上来,则说明正确,否则失败 System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get()); } } }
执行结果符合预期结果
阿里的解决方式仍然是使用的InheritThreadLocal,不同的是,阿里通过Javaagent修改了线程池的字节码,在线程池创建Runnable或者Callable的时候进行了包装,我们就叫RunnableWrapper。把需要传递的数据在new RunnableWrapper的时候就传递到了RunnableWrapper的成员变量中。在RunnableWrapper执行run方法的时候,先将成员变量的数据重新放一遍ThreadLocal,然后再真正执行被包装的Runnable的run方法。这样在真正的run方法中就可以拿到ThreadLocal的数据
实际项目使用TransmittableThreadLocal时,需要对线程池进行封装才可以,比如常用的线程池创建方式:
1、ExecutorService ExecutorService service = Executors.newFixedThreadPool(5);
使用TransmittableThreadLocal,需要对创建的ExecutorService封装。
ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
2、定义一个TtlThreadPoolTaskExecutor,继承ThreadPoolTaskExecutor,重写submit/execute方法,在初始化线程池时,使用TtlThreadPoolTaskExecutor即可
public class TtlThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { @Override public void execute(Runnable command) { Runnable ttlRunnable = TtlRunnable.get(command); super.execute(ttlRunnable); } @Override public <T> Future<T> submit(Callable<T> task) { Callable ttCallable = TtlCallable.get(task); return super.submit(ttCallable); } @Override public Future<?> submit(Runnable task) { Runnable ttlRunnable = TtlRunnable.get(task); return super.submit(ttlRunnable); } @Override public ListenableFuture<?> submitListenable(Runnable task){ Runnable ttlRunnable = TtlRunnable.get(task); return super.submitListenable(ttlRunnable); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { Callable ttlCallable = TtlCallable.get(task); return super.submitListenable(ttlCallable); } }
要使用的地方
ThreadPoolTaskExecutor threadPoolTaskExecutor = new TtlThreadPoolTaskExecutor();
spring容器中可以定义为bean全局使用
@Configuration public class ThreadPoolConfig { @Value("${config.thread.pool.core_pool_size:20}") private Integer corePoolSize; @Value("${config.thread.pool.max_pool_size:100}") private Integer maxPoolSize; @Value("${config.thread.pool.keep_alive_seconds:2}") private Integer keepAliveSeconds; @Value("${config.thread.pool.queue_capacity:200}") private Integer queueCapacity; @Value("${config.thread.pool.allow_core_thread_timeout:true}") private String allowCoreThreadTimeOut; @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new TtlThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(corePoolSize); threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize); threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds); threadPoolTaskExecutor.setQueueCapacity(queueCapacity); threadPoolTaskExecutor.setAllowCoreThreadTimeOut(Boolean.parseBoolean(allowCoreThreadTimeOut)); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return threadPoolTaskExecutor; } }
依赖注入使用
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
threadPoolTaskExecutor.submit(() -> {
//任务
});