一 线程状态
在之前讨论线程-基础知识的时候,详细讲述过java的线程模型、线程状态流转、线程安全出现的原因以及解决办法,所以这里不多过的讨论这一块。此小节的重点从源码的角度上看看那些地方会引起线程状态变化。
1 线程状态流转
图中Waiting、Blocking的区别是:
- 当线程处于Blocking状态时,代表着线程可以尝试获取CPU资源。
- 当线程处于Waiting状态时,线程不会获取CPU资源。
2 Thread源码
1) join
a) 源码分析
让当前线程等待,直到调用join方法的线程执行完毕后再执行,例如:在threadA中调用threadB.join()方法,就表示让threadA等待threadB执行完成,然后threadB再继续执行。从下面的代码中我们可以看到,join其实调用的时wait方法,代码如下:
/**
* Waits at most {@code millis} milliseconds for this thread to die. A timeout of {@code 0} means to wait forever.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
b) 示例
以下代码中,主线程将等待thread执行结束,然后再输出最后的结束信息。
public static void main(String[] args) {
System.out.println("Main Thread start....");
Thread thread = new Thread(() -> {
System.out.println("Child Thread start...");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Child Thread finished...");
});
thread.start();
System.out.println("Main Thread begin join...");
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main Thread finished.,,");
}
2) yield
public static native void yield();
出让cpu的执行时间,出让后和其他线程一起争抢CPU调度。
3) sleep
public static native void sleep(long millis) throws InterruptedException;
public static void sleep(long millis, int nanos) throws InterruptedException{…}
让线程休眠指定时间,休眠时不放弃线程持有的资源。
4) interrupt
3 Object源码
Object中的wait、notify也会影响线程状态,所以这里也就一起介绍了。值得一提的是JDK还提供了LockSupport类,此类中的park和unpark方法也能实现和wait、notify类似的功能。具体请参考:LockSupport
1) wait
public final void wait() throws InterruptedException{…}
public final void wait(long timeout, int nanos) throws InterruptedException { … }
public final native void wait(long timeout) throws InterruptedException;
让当前线程(假设是T)等待直到其他线程调用notify()、notifyAll()或者等待超过指定的时间。
当执行object.wait()方法时,将当前线程T放到此object对象的wait set中,然后放弃持有的资源,这个线程T将不再被cup调度,直到以下任意一个事情发生:
- 其他线程调用notify()方法,并且线程T正好的被唤醒。
- 其他线程调用notifyAll()方法。
- 其他线程interrupt线程T。
- 到了指定时间。如果参数timeout=0,那么时间参数将会被忽略,即此时只能通过上面三种方式唤醒。
线程T接着会从wait set中移除,接着将和其他线程一样争取object的锁,一旦线程T获得了object的锁,将会获得同步声明并且立即从之前的等待状态返回,继续执行我们的代码。
注意:wait()一定要放在sycronized内部,否则会报“java.lang.IllegalMonitorStateException”异常。这是因为wait方法会释放对象锁,所以如果没有使用sycronized同步时,因为没有锁就会报异常。示例代码如下:
synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout);
... // Perform action appropriate to condition
}
2) notify、notifyAll
public final native void notify();
public final native void notifyAll();
当调用object.notify()方法时:如果多个线程都在等待object对象的锁,那么唤醒其中的一个(一般而言是随机的唤醒,具体还要看底层的实现)。此方法仅能够被拥有此object对象锁的线程调用,只有以下三种情况,线程才能拥有对象的锁:
- 通过执行一个object对象synchronized的实例方法。
- 通过执行object的synchronized代码块。
- 如果是Class类型的对象,那么执行此class的static方法。
notifyAll和notify类似,主要的区别是notifyAll会唤醒等待此对象锁的所有线程,然后大家一起争抢cpu资源。
和wait一样,notify、notifyAll也必须放在sycronized内部,否则会报“java.lang.IllegalMonitorStateException”异常。示例代码如下:
synchronized (obj) {
... // do the work
obj.notify(timeout);
}
3) 示例代码
以下示例代码中,thread1先获得obj的锁,执行自己的逻辑,thread2因为获取不到锁一直阻塞;当thread1因为wait()而等待时,释放了obj的所资源,此时thread2将开始执行,thread2执行完后通过notifyAll()唤醒在obj上等待的线程,此后thread1将继续执行。
public static void main(String[] args) {
Object obj = new Object();
Thread thread1 = new Thread(() -> {
synchronized (obj) {
System.out.println(DateUtil.getCurrentTime() + " Thread1 start...");
try {
Thread.sleep(1000);
System.out.println(DateUtil.getCurrentTime() + " Thread1 begin wait...");
obj.wait();
System.out.println(DateUtil.getCurrentTime() + " Thread1 finished...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
synchronized (obj) {
System.out.println(DateUtil.getCurrentTime() + " Thread2 executing...");
obj.notifyAll();
}
});
thread2.start();
}
执行结果如下:
11:05:25:836 Thread1 start...
11:05:26:874 Thread1 begin wait...
11:05:26:874 Thread2 executing...
11:05:26:875 Thread1 finished...
二 ThreadLocal
ThreadLocal经常用来存储线程私有变量。
1 关系模型
ThreadLocal经常用来存储线程私有变量,在一个线程内可以多次创建ThreadLocal对象,但是这些ThreadLocal对象共用同一个ThreadLocalMap的实例。以下是Thread、ThreadLocal、ThreadLocalMap之间的关系。
从图中可以看到ThreadLocal通过Thread.currentThread()获取当前线程,并操作线程持有的ThreadLocalMap对象。总结一下:
- 每个Thread线程内部都有一个Map。
- Map里面存储线程本地对象(key)和线程的变量副本(value)
- Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。
2 set
以下是set方法的源码,代码逻辑比较简单,其逻辑是:如果thread中threadLocals(ThreadLocal.ThreadLocalMap类型)属性为空,那么创建一个ThreadLocalMap对象,赋给thread.threadLocals;否则直接使用此value值替换thread.threadLocals中的值。
需要注意的是:ThreadLocalMap是线程私有的,见ThreadLocal#createMap()方法。
ThreadLocal#set方法
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocal#createMap方法
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap#set方法
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
//清理掉陈旧Entry,并设置为当前的新值,这种Entry是由于ThreadLocal变量对象被回收后k==null造成的
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
ThreadLocalMap#replaceStaleEntry方法是用来复用ThreadLocalMap中的数据,当某一个ThreadLocal已经被清理了,那么就可以用这个空间来存储新的数据了。
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
3 Get
如下代码,从线程的threadLocals属性中查找是否存在需要的值,如果存在,那么返回,如果不存在初始化线程的threadLocals。
ThreadLocal#get方法
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
ThreadLocal#setInitialValue方法
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
ThreadLocalMap#getEntry方法
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
ThreadLocalMap#getEntryAfterMiss方法
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
注意ThreadLocalMap中的getEntry、getEntryAfterMiss方法,为什么我们set进去的value,后来取的时候会出现key为空的情况?这是因为ThreadLocalMap#Entry对象的key是WeakReference<ThreadLocal<?>>类型的,此种类型的引用,如果没有其他引用了,那么在GC的时候会被回收掉(当然如果此对象「key」还有其他有效引用,对象将不会被回收);因为value是普通类型的引用,当GC时如果key被回收了,那么将会清理掉此Entry的值。
4 remove
线程的threadLocals属性如果非空,那么从threadLocals中删除之前存放的值。
ThreadLocal#remove
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
ThreadLocalMap#remove
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
5 ThreadLocalMap与WeakReference
我们假设这样一种情况:假设ThreadLocal对象不再被引用了,那么JVM如何回收ThreadLocalMap中存储的数据呢?
当ThreadLocal不在被需要时,其在线程中存储的数据应该被GC回收掉,即线程中ThreadLocalMap里的key-value对应该失效。ThreadLocalMap#Entry中key是WeakReference<ThreadLocal<?>>类型,其实就是为了实现这一诉求的。因为key是WeakReference类型的应用,所以在GC的时,ThreadLocalMap中key会被回收,即被设置为null(注意此时因为ThreadLocalMap引用value的关系,所以value不会被回收)。在上面set、get等方法中,我们看到的expungeStaleEntry、replaceStaleEntry、cleanSomeSlots方法其实就是在整理、回收ThreadLocalMap中已经失效的key-value。如果触发了expungeStaleEntry方法,ThreadLocalMap中的数据就会被回收干净。
最好的处理方式是,如果某个value不需要了,手动调用ThreadLocal#remove方法删除,以免因为没有触发replaceStaleEntry()方法,而不能回收value。
Entry的部分代码如下所示
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}