前言
在上一篇文章中,给大家讲述了如何使用TransmittableThreadLocal
解决线程间上下文传递的问题,今天这篇文章我们来看看TransmittableThreadLocal
是如何实现线程间上下文传递的,它使用了什么方式解决了InheritableThreadLocal
都没有解决的线程复用导致上下文污染的问题;
set()操作
我们可以进入TransmittableThreadLocal
源码查看set()
方法:
public final void set(T value) { if (!this.disableIgnoreNullValueSemantics && value == null) { this.remove(); } else { // 调用父类set()方法,其实就是设置线程属性inheritableThreadLocals super.set(value); // 把自己放进一个WeakHashMap中; this.addThisToHolder(); } } 复制代码
因为TransmittableThreadLocal
继承了InheritableThreadLocal
类,所以super.set()
其实是将value
值放进线程属性inheritableThreadLocals
中;
在addThisToHolder()
方法中,其实就是在inheritableThreadLocals
中再放进一个键值对,key
对应的就是TransmittableThreadLocal
对象本身,value
就是null
,相当于把Holder
当成Set
用;
当set()
方法执行完毕后,当前线程中的inheritableThreadLocals
属性中有两个键值对;假如我们调用了set("china")
,当前线程中的inheritableThreadLocals
包含,Holder
也是一个TransmittableThreadLocal
对象:
TransmittableThreadLocal:"china", Holder:{ TransmittableThreadLocal:null } 复制代码
创建新的线程
在Thread
类中,我们发现在构造函数中有这么一段代码:
Thread parent = currentThread(); if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); 复制代码
当我们在应用程序中调用new Thread()
时,新创建出来的线程实例会复制父线程的inheritableThreadLocals
属性,这其实就是InheritableThreadLocal
的原理;根据这个原理,我们可以知道线程池中的所有线程都会复制父线程的上下文;
包装任务
在使用TransmittableThreadLocal
时,我们都会使用这么一个方法来包装任务:TtlRunnable.get(()->{})
,我们也应该猜到了,它其实就是对我们要做的任务做了一层增强;
Transmitter.capture()
在TtlRunnable
中,我们可以发现这样一段代码:
private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture()); 复制代码
它在父线程中调用TtlRunnable.get()
的时候就会执行,我们看看它做了什么操作:
@NonNull public static Object capture() { HashMap<TransmittableThreadLocal.Transmitter.Transmittee<Object, Object>, Object> transmittee2Value = TransmittableThreadLocal.newHashMap(transmitteeSet.size()); Iterator var1 = transmitteeSet.iterator(); while(var1.hasNext()) { TransmittableThreadLocal.Transmitter.Transmittee transmittee = (TransmittableThreadLocal.Transmitter.Transmittee)var1.next(); try { transmittee2Value.put(transmittee, transmittee.capture()); } catch (Throwable var4) { if (TransmittableThreadLocal.logger.isLoggable(Level.WARNING)) { TransmittableThreadLocal.logger.log(Level.WARNING, "exception when Transmitter.capture for transmittee " + transmittee + "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + var4, var4); } } } return new TransmittableThreadLocal.Transmitter.Snapshot(transmittee2Value); } 复制代码
其实这是一个统一的入口方法,它遍历了transmitteeSet
中的所有TransmittableThreadLocal.Transmitter.Transmittee
实例对象,并依次调用了它们的capture()
方法,最后把结果保存起来了,并最终返回给了capturedRef
;
我们依次看一下具体做了啥操作:
public HashMap<TransmittableThreadLocal<Object>, Object> capture() { HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = TransmittableThreadLocal.newHashMap(((WeakHashMap)TransmittableThreadLocal.holder.get()).size()); // 在父线程中调用holder.get(),并遍历所有的key Iterator var2 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator(); while(var2.hasNext()) { // 取出key值 TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var2.next(); // 拷贝key值 ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } 复制代码
假如之前在父线程中的inheritableThreadLocals
中存储的内容如下:
TransmittableThreadLocal:"china", Holder:{ TransmittableThreadLocal:null } 复制代码
那么最终ttl2Value
中存储的值就是:
TransmittableThreadLocal:"china" 复制代码
第二个capture()
可以不用关心,其实是用来收集ThreadLocal
中的数据,这是TransmittableThreadLocal
为了兼容ThreadLocal
做的处理;TransmittableThreadLocal
本身提供了registerThreadLocal
这样的方法来兼容ThreadLocal
;
所以上述代码其实就是在创建任务时,从父线程中收集TransmittableThreadLocal
和ThreadLocal
的信息,并保存到每一个任务当中;
Transmitter.replay()
前面的数据收集工作全部做完后,我们的任务就被扔到线程池中执行,我们可以看一下包装后的run()
方法:
public void run() { // 取出父线程中的快照信息 Object captured = this.capturedRef.get(); if (captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) { // 把该信息重放到当前线程的上下文中 Object backup = Transmitter.replay(captured); try { // 执行业务代码 this.runnable.run(); } finally { // 恢复现场 Transmitter.restore(backup); } } else { throw new IllegalStateException("TTL value reference is released after run!"); } } 复制代码
我们看一下Transmitter.replay()
做了啥操作,同样的还是有一个统一的入口,那么我们直接看具体的replay()
操作:
@NonNull public HashMap<TransmittableThreadLocal<Object>, Object> replay(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) { // 在子线程中执行,Holder已经把父线程的inheritableThreadLocals属性复制过来了 HashMap<TransmittableThreadLocal<Object>, Object> backup = TransmittableThreadLocal.newHashMap(((WeakHashMap)TransmittableThreadLocal.holder.get()).size()); // 遍历Holder中的key Iterator iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator(); while(iterator.hasNext()) { // 取出key值 TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next(); // 通过key值找到之前设置的value值,因为子线程已经复制了父线程的inheritableThreadLocals属性 backup.put(threadLocal, threadLocal.get()); // 如果备份数据中没有找到这个key值,那么就做删除操作 if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 把value设置到当前线程中 TransmittableThreadLocal.Transmitter.setTtlValuesTo(captured); TransmittableThreadLocal.doExecuteCallback(true); // 返回备份数据 return backup; } 复制代码
1.借助
new Thread()
会拷贝inheritableThreadLocals
属性的特性,可以直接在子线程中通过Holder
获取TransmittableThreadLocal
对象,并通过TransmittableThreadLocal
获取父线程中设置的value
;2.子线程中的数据会和快照中的数据做一个比较,并删除无效数据;
3.把父线程中的快照数据设置进子线程中,这样就可以在子线程中获取
TransmittableThreadLocal
设置的value
;
Transmitter.restore()
我们可以看到在业务代码处理完毕后,还调用了Transmitter.restore(backup)
的操作,我们来看一下里面都做了啥,同样也有一个统一的入口代码,我们直接略过看具体实现:
public void restore(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) { TransmittableThreadLocal.doExecuteCallback(false); // 遍历Holder中的所有key Iterator iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator(); while(iterator.hasNext()) { TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next(); // 通过与backup的对比判断在子线程中是否设置了额外的键值对进TransmittableThreadLocal中 // 删除额外设置的数据 if (!backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 最终把子线程上下文恢复到backup,保持和执行业务之前的上下文一致 TransmittableThreadLocal.Transmitter.setTtlValuesTo(backup); } 复制代码
Transmitter.restore()
的恢复操作时非常关键的,它很好地隔离了每一个任务间的上下文数据,虽然使用的是同一个线程执行不同的任务,但是在任务执行完毕后,通过Transmitter.restore()
操作恢复了子线程的上下文数据;
小结
TransmittableThreadLocal
通过对执行任务的包装,对每一个任务都做了一层增强,在任务创建的时候capture()
复制了一份父线程的数据,同时利用new Thread()
特性,使用Holder
解决了在父子线程中获取TransmittableThreadLocal
实例的问题,使得在不同的线程中都能很方便地获取TransmittableThreadLocal
和对应的value
值。通过在任务执行之前在子线程中replay(captured)
重放快照的方式把上下文数据设置进子线程中,并在业务执行完毕后使用restore(backup)
恢复子线程上下文。