聊聊Thread的状态与ThreadLocal

简介: 主要讨论线程状态流转,以及引起状态变化的方法。 分析ThreadLocal原理

 线程状态

在之前讨论线程-基础知识的时候,详细讲述过java的线程模型、线程状态流转、线程安全出现的原因以及解决办法,所以这里不多过的讨论这一块。此小节的重点从源码的角度上看看那些地方会引起线程状态变化。

1 线程状态流转

8a68a5501936ea5d21bbd2b5b92fa3d2687202f1

图中Waiting、Blocking的区别是:

 - 当线程处于Blocking状态时,代表着线程可以尝试获取CPU资源。

 - 当线程处于Waiting状态时,线程不会获取CPU资源。

 

2  Thread源码

1)   join

a)    源码分析

让当前线程等待,直到调用join方法的线程执行完毕后再执行,例如:在threadA中调用threadB.join()方法,就表示让threadA等待threadB执行完成,然后threadB再继续执行。从下面的代码中我们可以看到,join其实调用的时wait方法,代码如下:

/**
 * Waits at most {@code millis} milliseconds for this thread to die. A timeout of {@code 0} means to wait forever.
 */
public final synchronized void join(long millis)
throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

b)     示例

以下代码中,主线程将等待thread执行结束,然后再输出最后的结束信息。

public static void main(String[] args) {
    System.out.println("Main Thread start....");
    Thread thread = new Thread(() -> {
        System.out.println("Child Thread start...");
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Child Thread finished...");

    });
    thread.start();
    System.out.println("Main Thread begin join...");
    try {
        thread.join(); 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Main Thread finished.,,");
}

 

2)   yield

    public static native void yield();

出让cpu的执行时间,出让后和其他线程一起争抢CPU调度。

 

3)   sleep

public static native void sleep(long millis) throws InterruptedException;

public static void sleep(long millis, int nanos)  throws InterruptedException{…}

让线程休眠指定时间,休眠时不放弃线程持有的资源。

 

4)   interrupt

参考Interrupt

 

3  Object源码

Object中的wait、notify也会影响线程状态,所以这里也就一起介绍了。值得一提的是JDK还提供了LockSupport类,此类中的park和unpark方法也能实现和wait、notify类似的功能。具体请参考:LockSupport

1)   wait

public final void wait() throws InterruptedException{…}

public final void wait(long timeout, int nanos) throws InterruptedException { … }

public final native void wait(long timeout) throws InterruptedException;

让当前线程(假设是T)等待直到其他线程调用notify()、notifyAll()或者等待超过指定的时间。

当执行object.wait()方法时,将当前线程T放到此object对象的wait set中,然后放弃持有的资源,这个线程T将不再被cup调度,直到以下任意一个事情发生:

 - 其他线程调用notify()方法,并且线程T正好的被唤醒。

 - 其他线程调用notifyAll()方法。

 - 其他线程interrupt线程T。

 -   到了指定时间。如果参数timeout=0,那么时间参数将会被忽略,即此时只能通过上面三种方式唤醒。

线程T接着会从wait set中移除,接着将和其他线程一样争取object的锁,一旦线程T获得了object的锁,将会获得同步声明并且立即从之前的等待状态返回,继续执行我们的代码。

注意:wait()一定要放在sycronized内部,否则会报“java.lang.IllegalMonitorStateException”异常。这是因为wait方法会释放对象锁,所以如果没有使用sycronized同步时,因为没有锁就会报异常。示例代码如下:

     synchronized (obj) {

         while (<condition does not hold>)

            obj.wait(timeout);

         ... // Perform action appropriate to condition

     }

 

2)   notify、notifyAll

public final native void notify();

public final native void notifyAll();

当调用object.notify()方法时:如果多个线程都在等待object对象的锁,那么唤醒其中的一个(一般而言是随机的唤醒,具体还要看底层的实现)。此方法仅能够被拥有此object对象锁的线程调用,只有以下三种情况,线程才能拥有对象的锁:

 - 通过执行一个object对象synchronized的实例方法。

 - 通过执行object的synchronized代码块。

 - 如果是Class类型的对象,那么执行此class的static方法。

notifyAll和notify类似,主要的区别是notifyAll会唤醒等待此对象锁的所有线程,然后大家一起争抢cpu资源。

和wait一样,notify、notifyAll也必须放在sycronized内部,否则会报“java.lang.IllegalMonitorStateException”异常。示例代码如下:

     synchronized (obj) {

         ... // do the work

             obj.notify(timeout);

     }

 

3)   示例代码

以下示例代码中,thread1先获得obj的锁,执行自己的逻辑,thread2因为获取不到锁一直阻塞;当thread1因为wait()而等待时,释放了obj的所资源,此时thread2将开始执行,thread2执行完后通过notifyAll()唤醒在obj上等待的线程,此后thread1将继续执行。

public static void main(String[] args) {
    Object obj = new Object();
    Thread thread1 = new Thread(() -> {
        synchronized (obj) {
           System.out.println(DateUtil.getCurrentTime() + " Thread1 start...");
            try {
                Thread.sleep(1000);
               System.out.println(DateUtil.getCurrentTime() + " Thread1 begin wait...");
                obj.wait();

               System.out.println(DateUtil.getCurrentTime() + " Thread1 finished...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    thread1.start();
    Thread thread2 = new Thread(() -> {
        synchronized (obj) {
            System.out.println(DateUtil.getCurrentTime() + " Thread2 executing...");
            obj.notifyAll();
        }
    });
    thread2.start();
}

执行结果如下:

11:05:25:836 Thread1 start...

11:05:26:874 Thread1 begin wait...

11:05:26:874 Thread2 executing...

11:05:26:875 Thread1 finished...

 

 ThreadLocal

ThreadLocal经常用来存储线程私有变量。

1  关系模型

ThreadLocal经常用来存储线程私有变量,在一个线程内可以多次创建ThreadLocal对象,但是这些ThreadLocal对象共用同一个ThreadLocalMap的实例。以下是Thread、ThreadLocal、ThreadLocalMap之间的关系。

74a0bc0746699824387210d29d1cd999764e0d8f

从图中可以看到ThreadLocal通过Thread.currentThread()获取当前线程,并操作线程持有的ThreadLocalMap对象。总结一下:

 - 每个Thread线程内部都有一个Map。

 - Map里面存储线程本地对象(key)和线程的变量副本(value)

 - Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。

 

2  set

以下是set方法的源码,代码逻辑比较简单,其逻辑是:如果thread中threadLocals(ThreadLocal.ThreadLocalMap类型)属性为空,那么创建一个ThreadLocalMap对象,赋给thread.threadLocals;否则直接使用此value值替换thread.threadLocals中的值。

需要注意的是:ThreadLocalMap是线程私有的,见ThreadLocal#createMap()方法。

ThreadLocal#set方法

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

 

ThreadLocal#createMap方法

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

 

ThreadLocalMap#set方法

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        if (k == key) {
            e.value = value;
            return;
        }
        if (k == null) {
       //清理掉陈旧Entry,并设置为当前的新值,这种Entry是由于ThreadLocal变量对象被回收后k==null造成的

           replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

ThreadLocalMap#replaceStaleEntry方法是用来复用ThreadLocalMap中的数据,当某一个ThreadLocal已经被清理了,那么就可以用这个空间来存储新的数据了。

private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // Back up to check for prior stale entry in current run.
    // We clean out whole runs at a time to avoid continual
    // incremental rehashing due to garbage collector freeing
    // up refs in bunches (i.e., whenever the collector runs).
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;

    // Find either the key or trailing null slot of run, whichever
    // occurs first
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();

        // If we find key, then we need to swap it
        // with the stale entry to maintain hash table order.
        // The newly stale slot, or any other stale slot
        // encountered above it, can then be sent to expungeStaleEntry
        // to remove or rehash all of the other entries in run.
        if (k == key) {
            e.value = value;

            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // Start expunge at preceding stale entry if it exists
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // If we didn't find stale entry on backward scan, the
        // first stale entry seen while scanning for key is the
        // first still present in the run.
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }

    // If key not found, put new entry in stale slot
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // If there are any other stale entries in run, expunge them
    if (slotToExpunge != staleSlot)
       cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

 


3  Get

如下代码,从线程的threadLocals属性中查找是否存在需要的值,如果存在,那么返回,如果不存在初始化线程的threadLocals。

ThreadLocal#get方法

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
           @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

 

ThreadLocal#setInitialValue方法

private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

 

ThreadLocalMap#getEntry方法

private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

 

ThreadLocalMap#getEntryAfterMiss方法

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
            expungeStaleEntry(i);
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

 

注意ThreadLocalMap中的getEntry、getEntryAfterMiss方法,为什么我们set进去的value,后来取的时候会出现key为空的情况?这是因为ThreadLocalMap#Entry对象的key是WeakReference<ThreadLocal<?>>类型的,此种类型的引用,如果没有其他引用了,那么在GC的时候会被回收掉(当然如果此对象「key」还有其他有效引用,对象将不会被回收);因为value是普通类型的引用,当GC时如果key被回收了,那么将会清理掉此Entry的值。

 

4  remove

线程的threadLocals属性如果非空,那么从threadLocals中删除之前存放的值。

ThreadLocal#remove

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

ThreadLocalMap#remove

private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            e.clear();
            expungeStaleEntry(i);
            return;
        }
    }
}

 

5  ThreadLocalMap与WeakReference

我们假设这样一种情况:假设ThreadLocal对象不再被引用了,那么JVM如何回收ThreadLocalMap中存储的数据呢?

当ThreadLocal不在被需要时,其在线程中存储的数据应该被GC回收掉,即线程中ThreadLocalMap里的key-value对应该失效。ThreadLocalMap#Entry中key是WeakReference<ThreadLocal<?>>类型,其实就是为了实现这一诉求的。因为key是WeakReference类型的应用,所以在GC的时,ThreadLocalMap中key会被回收,即被设置为null(注意此时因为ThreadLocalMap引用value的关系,所以value不会被回收)。在上面set、get等方法中,我们看到的expungeStaleEntry、replaceStaleEntry、cleanSomeSlots方法其实就是在整理、回收ThreadLocalMap中已经失效的key-value。如果触发了expungeStaleEntry方法,ThreadLocalMap中的数据就会被回收干净。

最好的处理方式是,如果某个value不需要了,手动调用ThreadLocal#remove方法删除,以免因为没有触发replaceStaleEntry()方法,而不能回收value。

Entry的部分代码如下所示

static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;
    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

 

相关文章
|
监控 算法 关系型数据库
DBA很忙—MySQL的性能优化及自动化运维实践
作者:王辰 来自:高效运维(ID:greatops) DBA的日常工作 首先,我们来看看DBA的具体工作,我觉得 DBA 真的很忙:备份和恢复、监控状态、集群搭建与扩容、数据迁移和高可用,这是我们 DBA 的功能。
5612 0
|
关系型数据库 分布式数据库 数据库
学习分布式不得不会的ACP理论
2000年7月,加州大学伯克利分校的Eric Brewer教授在ACM PODC会议上提出CAP猜想。2年后,麻省理工学院的Seth Gilbert和Nancy Lynch从理论上证明了CAP。
3441 0
|
存储 编解码 Android开发
Studio One6汉化中文版本更新下载
Studio One6全新版本上线记录、生产、混合、掌握和执行所有操作。从工作室到舞台,Studio One6以易用为核心,是您的创意合作伙伴。当你准备好登上舞台时,Studio One就在那里。只有Studio One从最初的灵感到完整的制作,最终混音到精选专辑,数字发行到舞台制作,无缝地与你一起移动,让你真正的创造没有界限。
2110 0
ensp 三层交换机、链路聚合和指定端口选举
ensp 三层交换机、链路聚合和指定端口选举
611 0
ensp 三层交换机、链路聚合和指定端口选举
|
机器学习/深度学习 存储 人工智能
反向传播不香了?解读 Hinton 大佬的 Forward-Forward 算法
反向传播不香了?解读 Hinton 大佬的 Forward-Forward 算法
|
开发工具 数据安全/隐私保护 开发者
QQ 互联平台是干什么的?底层原理是什么?
QQ 互联平台是干什么的?底层原理是什么?
1364 0
|
Serverless 对象存储 Python
函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理
在本文中,以用户上传超大压缩文件( zip 类型)到 oss, oss 系统捕获 PutObjec/PostObject 的事件, 自动触发函数执行, 函数将压缩文件解压,并将对应的解压文件放在oss 指定的 bucket 的某个目录下为例,展示 FC 的灵丹妙手
9776 0
函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理
|
Kubernetes 网络协议 应用服务中间件
为私有Kubernetes集群创建LoadBalancer服务
MetalLB - 可以为私有 Kubernetes 集群提供LoadBalancer类型的负载均衡支持。 在Kubernetes集群中,可以使用Nodeport、Loadbalancer和Ingress三种方式老来暴露服务给外部访问(缺省情况下,内部Pod提供的服务是在相互隔离的子网中,只有同一个Pod内部的几个容器可以直接进行网络访问)。
6479 0
|
Java 大数据 应用服务中间件
面试时这样介绍自己的项目经验,成功率能达到98.99%
虽时至年底,大多数小伙伴都知道,2018年是互联网行业最不平凡的一年。各类平台的倒闭、破产、清算,尤其是6、7月分全国大范围P2P集中爆雷跑路,再加上贸易战等,居多因素,裁员的、失业的大有人在,所以,虽近年底,找工作的伙伴应该还不在少数,所以,今天,给大家分享一些面试技巧文章。
1530 0
|
监控 Dubbo Java
分布式 | Dubbo 架构设计详解
Dubbo是Alibaba开源的分布式服务框架,它最大的特点是按照分层的方式来架构,使用这种方式可以使各个层之间解耦合(或者最大限度地松耦合)。
1190 1