前言
保险出单场景中,业务员出单时后台需要根据业务员所在机构来决定业务流转,审计字段记录出单员信息等,这些在后端处理中需要多次用到,每笔订单都是独立对应一个业务线程,可以使用ThreadLocal将业务员代码存储到线程中,方便后续取值。
概述
我们知道想要启动一个新的线程,最终需要用到Thread类,一个Thread对应一个独立的线程。所以我们是不是可以在Thread中定义一个Map集合,最后以key-value的形式将内容存储到里面;我们在线程中想要拿到当前线程的引用对象,只需要调用Thread.currentThread()即可拿到实例,拿到实例后从其属性Map集合中进行取值即可,由于每个Thread都是独立的,这样就能做到线程间的隔离了。
ThreadLocal往Thread容器中添加数据时,以自身对象作为存储的key,以自身对象左右取值的Key;
正文
案例
public static void main(String[] args) { ThreadLocal<String > loginUserCode=new ThreadLocal<>(); new Thread(()->{ loginUserCode.set("100000"); System.out.println(Thread.currentThread().getName()+":"+loginUserCode.get()); }).start(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+":"+loginUserCode.get()); loginUserCode.set("200000"); System.out.println(Thread.currentThread().getName()+":"+loginUserCode.get()); Thread thread = Thread.currentThread(); }).start(); }
通过上面的案例知道ThreadLocal的基本用法。
数据存储结构
从上述的步骤中我们知道,Thread中肯定存储一个缓存容器用于存储数据,所以先看下其缓存的实现
ThreadLocalMap的源码比较多,这里我就挑比较重点属性的出来讲解;
static class ThreadLocalMap { private Entry[] table; //默认Enrty[]容量 private static final int INITIAL_CAPACITY = 16; static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { //将key值包装成弱引用进行存储 super(k); value = v; } } }
看过HashMap源码的同学看这部分应该会比较容易点,这里的Entry继承了弱引用修饰的ThreadLocal。
ThreadLocal: set(T value)方法
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { map.set(this, value); } else { createMap(t, value); } }
ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
- 获取当前线程
- 从当前线程中获取缓存对象
- 判断该缓存对象是否被初始化,没有则创建一个容器并赋值
- 将当前的ThreadLocal对象作为key,与value存入缓存中
private void set(ThreadLocal<?> key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; //计算出当前对象在数组中的下标,这里相当于取模效果 int i = key.threadLocalHashCode & (len-1); //看是否出现Hash冲突的情况 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); //如果其k值对象一样,证明是修改操作,直接复制即可 if (k == key) { e.value = value; return; } //由于是弱引用,之前的key可能被垃圾回收,所以存储为null的,情况。 //所以这里需要重新选址和清理一些垃圾 if (k == null) { replaceStaleEntry(key, value, i); return; } } //走到这里,证明数组中不存在hash冲突的情况,这里创建一个对象并存入数组中 tab[i] = new Entry(key, value); //当前存储容量+1 int sz = ++size; //如果没有清理垃圾的情况发生,并且容量超过阈值大小了,需要进行扩容 if (!cleanSomeSlots(i, sz) && sz >= threshold) //扩容。容量为当前旧数组的2倍 rehash(); }
综上所述:
根据当前ThreadLocal计算Hash值,并进行与运算,得到数组下标
判断当前数组对应下标是否有值,没有则创建一个新的enrty对象进行存储,并尝试进行垃圾回收和扩容
如果数组对应下标已经有值了,则是hash冲突了,判断是否存在key值相等的情况,如果存在则直接覆盖
如果当前key值不相等,此时使用线性寻址法找出对应的下标位置
对key值为null的垃圾进行回收。
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; // Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). //存储当前下标志 int slotToExpunge = staleSlot; //往前找,找出被垃圾回收的ThreadLocal对应的下标值 for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; // Find either the key or trailing null slot of run, whichever // occurs first //在数组中一直往后寻找,看是否能找到key一致的位置 for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. //找到位置,将该位置的值与之前ThreadLocal为null的位置进行互换 //这里主要让数据在数组中尽可能连城一块,减少碎片化 if (k == key) { e.value = value; //将当前key为null的位置与真正存储值的数据进行互换,减少碎片化 tab[i] = tab[staleSlot]; tab[staleSlot] = e; // Start expunge at preceding stale entry if it exists //判断在hash冲突位置的前面是否存在垃圾数据, //存在时 则两者不等,从最前面开始进行扫描清理垃圾 if (slotToExpunge == staleSlot) slotToExpunge = i; //垃圾清理 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } // If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } // If key not found, put new entry in stale slot //如果key不存在,则创建一个新的,放入数组中 tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); // If there are any other stale entries in run, expunge them //判断前面位置是否存在垃圾数据,存在则从最前面开始清理,否则不需要处理 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
综上所述:
找出当前hash冲突位置前面是否存在key为null(被垃圾回收)的脏数据
一直往后找,看是否存在key相等的位置,有则将该位置的数据与hash冲突位置进行互换,减少内存空间的碎片化
垃圾清理,判断清理的位置是否在hash冲突位置的前面,如果是则从前面开始清理
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; //一直往后找,找到位置为null的数组下标 for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); //如果是垃圾数据,则清理。 if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); //看是否存在Hash冲突的情况 if (h != i) { //将当前位置的值清理掉 //重新在hash冲突处往后寻找一个地址为null的数据进行存储 //因为前面可能存在清理过的位置,这样的好处是减少碎片化,将数据尽量往前面靠近 tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
综上所述:
- 从参数下标处开始扫描,如果数组位置没有存储数据,则返回该下标
- 如果key已经被回收了,此时将该位置的空间进行清理
- 如果存在hash冲突的情况,将当前位置空间清理掉,重新在hash冲突处往后再寻找一个空的空间进行存储,这样的好处是让数据能连续,避免过度碎片化
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; }
由于整个可能存在碎片化过多的情况,而expungeStaleEntry()方法遇到位置为空时,就结束清理。所以需要配合cleanSomeSlots()方法进行指数次清理。我理解使用指数次清理的方法可能清理不是特别干净,但是相对整个数组的扫描效率要高得多,数组过大时,整组扫描清理的方法会过度消耗时间。
ThreadLocal:get()方法
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
- 获取当前线程
- 从当前线程中获取缓存对象
- 判断该缓存对象是否被初始化,没有则创建一个容器并赋默认值
- ThreadLocal对象作为key从缓存对象中获取值
private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); }
ThreadLocal:remove()方法
public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) { m.remove(this); } }
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); //释放当前数组位置空间,并进行垃圾清理 expungeStaleEntry(i); return; } } }
总结
Thread中定义了ThreadLocalMap 容器用于存储当前线程数据,ThreadLocalMap 的数据结构采用数组的方法,不像HashMap采用数组+链表的方法,所以在产生hash冲突时,两者的处理方法不一样。ThreadLocalMap中Key为弱引用的ThreadLocal对象,当发生GC时,该对象会被清理,而此时key所对应的value为强引用,所以就存在key=null,value不为null的情况,如果线程持续时间长,value得不到回收就会存在内存泄漏,如果出现大量线程,可能还会引发内存溢出。
所以ThreadLocalMap中定义了一些回收策略,对key为null的垃圾进行清理后,可能会导致数组中存在大量不连续的null的空间,整个数组碎片化验证。所以在expungeStaleEntry()方法中,对垃圾清理的同时,也会有对应策略重新计算下标,将数组存储尽量连续性;