大家好,我是 王有志,欢迎和我聊技术,聊漂泊在外的生活。快来加入我们的Java提桶跑路群: 共同富裕的Java人。
原计划是今天结束线程的部分,但是写完后才发现,光Thread类的核心方法分析就写了5000多字了,所以不得不再拆出来一篇。
在关于线程你必须知道的8个问题(上)我们一起学习了如何创建线程,以及Java中线程状态,那么今天就来学习Thread类的核心方法。
Tips:
- Java及JVM源码基于Java 11
- JVM源码仅展示关键内容,另附Open JDK链接
- 文末附Java方法使用Demo的Gitee地址
Thread.start和Thread.run
上一篇中我们已经知道,Thread.run实际上是来自Runnable
接口,直接调用并不会启动新线程,只会在主线程中运行。
Thread.start方法中调用的Thread.start0方法是真正承载了创建线程,调用Thread.run方法的能力。其实到这里已经回答了它们之间的区别,接下来我们一起来看底层是如何实现的。
Tips:有面向对象编程语言基础的,看懂JVM源码对你来说并不困难。
首先是thread.c文件,该文件为Java中Thread类注册了native方法。
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"interrupt0", "()V", (void *)&JVM_Interrupt}
};
Tips:native方法是Java Native Interface,简称JNI。
第一眼就可以看到start0
对应的JVM方法JVM_StartThread
,实现是在jvm.cpp中:
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
// 创建虚拟机层面的线程
native_thread = new JavaThread(&thread_entry, sz);
}
Thread::start(native_thread);
JVM_END
接着来看new JavaThread
做了什么,在thread.cpp中:
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
os::create_thread(this, thr_type, stack_sz);
}
os::create_thread
创建了操作系统层面的线程。这和上一篇中得到的结论是一致的,Java中的Thread.start0完成了操作系统层面线程的创建和启动。
个人认为Thread.run
和Thread.start
是没什么可比性的。如果被问到这个问题,要么是面试官懒,网上随便找找就来问,要么是技术水平确实一般。
Tips:
- Thread::start方法在thread.cpp中;
- os::create_thread方法在os_linux.cpp中,注意操作系统的区别;
- os::pd_start_thread方法在os_linux.cpp中,注意操作系统的区别;
- os::start_thread方法在os.cpp中。
Thread.sleep和Object.wait
接下来看两个可以放在一起比较的方法:
- Object.wait
- Thread.sleep
很明显的区别是,它们并不在同一个类中定义,其次方法名上也能看出些许差别,“等待”和“睡眠”。
Object.wait
Java在Object类中,提供了2个wait方法的重载,不过最终都是调用JNI方法:
public final native void wait(long timeoutMillis) throws InterruptedException;
方法声明中我们能得知该方法的作用--使线程暂停指定的时间。接着我们来看Object.wait
的方法注释:
Causes the current thread to wait until it is awakened, typically by being notified or interrupted, or until a certain amount of real time has elapsed.
使当前线程阻塞,直到主动唤醒或者超过指定时间。清晰的说明了Object.wait
的功能,另外也提示了如何唤醒线程:
- Object.notify
- Object.notifyAll
有了之前的经验,很容易想到Object.wait
方法是在Object.c中注册的。我们找到它在jvm.cpp中的实现:
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
接着是ObjectSynchronizer::wait
,在synchronizer.cpp中:
int ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj(), inflate_cause_wait);
monitor->wait(millis, true, THREAD);
return dtrace_waited_probe(monitor, obj, THREAD);
}
获取ObjectMonitor对象时,调用了ObjectSynchronizer::inflate
方法,inflate翻译过来是膨胀的意思,是锁膨胀的过程。实际上,在未展示的代码中,还有偏向锁的过程,不过这些不是这部分的重点。
然后调用ObjectMonitor.wait,这个方法有225行,只看想要的部分:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
// 获取当前线程
Thread * const Self = THREAD;
// 添加到等待队列中
AddWaiter(&node);
// 退出监视器
exit(true, Self);
// 对等待时间的处理
if (millis <= 0) {
Self->_ParkEvent->park();
} else {
ret = Self->_ParkEvent->park(millis);
}
}
答案已经呼之欲出了,ObjectMonitor.wait中退出了监视器,在Java层面就是Object.wait方法会释放监视器锁。
对不同等待时间的处理也需要关注一下,millis <= 0
的情况下,执行的是Self->_ParkEvent->park()
,除非主动唤醒,否则线程永远停在这里。在Java层面看,执行object.wait(0)会使当前线程永久阻塞。
既然都到这了,就多说一句,ObjectMonitor.exit中有几行关键代码,是synchronized
特性实现的关键:
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
for (;;) {
if (Knob_ExitPolicy == 0) {
OrderAccess::release_store(&_owner, (void*)NULL);
OrderAccess::storeload();
}
}
}
这些内容我们提前混个眼熟,后面在synchronized
中详细解释。我们来思考两个问题:
- 为什么Object.wait必须要在synchronized中调用?
- 为什么wait方法设计在Object类中,而不是Thread类中?
首先,我们已经知道Object.wait
的底层实现中,要释放监视器锁,释放的前提是什么?要先拥有监视器锁。那么在synchronized
中调用Object.wait
就很容易理解了。
其次,锁住的是什么?是对象,从来都不是执行线程(Thread实例是线程对象,不是执行线程)。因此涉及到监视器锁操作的方法是不是放到Object中更合适呢?
最后,如果你仔细阅读过Object.wait
所有重载方法注释的话,你会发现一个词:spurious wakeup(虚假唤醒)。
这是没有主动notify/notifyAll,或者被动中断,超时的情况下就唤醒处于WAITING
状态的线程。因此Java也建议你在循环中调用Object.wait
:
synchronized (obj) {
while (<condition does not hold> and <timeout not exceeded>) {
long timeoutMillis = ... ; // recompute timeout values
int nanos = ... ;
obj.wait(timeoutMillis, nanos);
}
...// Perform action appropriate to condition or timeout
}
简单解释下虚假唤醒产生的原因,我们已经知道Object.wait
最终是通过Self->_ParkEvent->park()
或Self->_ParkEvent->park(millis)
实现线程暂停的,其调用的park方法位于os_posix.cpp中:
void os::PlatformEvent::park() {
status = pthread_cond_wait(_cond, _mutex);
}
int os::PlatformEvent::park(jlong millis) {
status = pthread_cond_timedwait(_cond, _mutex, &abst);
}
pthread_cond_wait
和pthread_cond_timedwait
是Linux对POSIX的实现,知道其作用即可,就不继续深入了。
我们很容易联想到,Object.notify
的底层实现是调用os::PlatformEvent::unpark
方法完成的。不出所料,从Object.c到ObjectMonitor.cpp,最后会发现该方法包含在os_posix.cpp中:
void os::PlatformEvent::unpark() {
status = pthread_cond_signal(_cond);
}
同样的,pthread_cond_signal
也是Linux对POSIX的实现。Linux man page中对其的解释是:
The pthread_cond_broadcast() function shall unblock all threads currently blocked on the specified condition variable cond. The pthread_cond_signal_() function shall unblock at least one of the threads that are blocked on the specified condition variable cond (if any threads are blocked on cond).
其中第二段是关键,即pthread_cond_signal
会唤醒至少一个阻塞在指定条件上的线程。也就是说,调用Object.notify
可能会唤醒不止一个符合条件的线程。
Java层面有一个经典的例子--生产者消费者,只贴出产品部分的代码:
static class Product {
private int count;
private synchronized void increment() throws InterruptedException {
if (this.count > 0) {
wait();
}
count++;
System.out.println(Thread.currentThread().getName() + "生产,总数:" + this.count);
notify();
}
private synchronized void decrement() throws InterruptedException {
if (this.count <= 0) {
wait();
}
count--;
System.out.println(Thread.currentThread().getName() + "消费,总数:" + this.count);
notify();
}
}
如果有1个生产者,多个消费者,消费者判定产品数量为0后,全部进入等待,生产者生产后,通知消费者消费,此时多个消费者被唤醒,直接进行消费,造成产品的总量为负数的情况。
改进的方法也很简单:
- 判断方式由
if
修改为while
,不断地检查条件 notify
修改为notifyAll
,避免死锁产生
Thread.sleep
首先是方法声明:
public static native void sleep(long millis) throws InterruptedException;
通过字面意思可以看出,让线程“睡眠”指定时间。再来看注释提供了哪些信息:
Causes the currently executing thread to sleep (temporarily cease execution) for the specified number of milliseconds plus the specified number of nanoseconds, subject to the precision and accuracy of system timers and schedulers. The thread does not lose ownership of any monitors.
最后一句非常重要,The thread does not lose ownership of any monitors意思是,使线程进入休眠,但不会丢失任何监视器锁的所有权。通俗点来说就是,我可以不用,但我不能没有。
Thread.sleep
依旧是JNI方法,直接看JVM实现,在jvm.cpp中:
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
HOTSPOT_THREAD_SLEEP_BEGIN(millis);
EventThreadSleep event;
if (millis == 0) {
os::naked_yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
if (os::sleep(thread, millis, true) == OS_INTRPT) {
if (!HAS_PENDING_EXCEPTION) {
if (event.should_commit()) {
post_thread_sleep_event(&event, millis);
}
HOTSPOT_THREAD_SLEEP_END(1);
}
}
thread->osthread()->set_state(old_state);
}
HOTSPOT_THREAD_SLEEP_END(0);
JVM_END
判断休眠时间millis
,如果millis == 0
,调用os::naked_yield()
,源码在os_linux.cpp中,该方法会让出CPU时间。真是“大公无私”啊,但是唤醒是由操作系统决定。
Tips:Java 11对millis == 0
的逻辑做了修改,可以查看Java 8的逻辑,我有点忘了。
也就是说,执行thread.sleep(0)并不是“咻”的一下什么都不做就结束了,而是真正的让出了CPU时间。
接着是else
的部分,最关键的是os::sleep(thread, millis, true)
,调用操作系统sleep
方法进入休眠,以对Linux的封装os_posix.cpp中的实现为例:
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
ParkEvent * const slp = thread->_SleepEvent ;
jlong prevtime = javaTimeNanos();
for (;;) {
jlong newtime = javaTimeNanos();
millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
if (millis <= 0) {
return OS_OK;
}
prevtime = newtime;
slp->park(millis);
}
}
简化后就很好理解了,计算millis
剩余时间,millis > 0
调用park
暂停线程,唤醒后继续循环,millis <= 0
则表示休眠结束。
到这里Thread.sleep
的内容也算告一段落了,分析的过程中没有发现涉及到ObjectMontior
的地方,因此断定Thread.sleep并不会释放监视器锁的所有权。
Thread.yield和LockSupport.park
趁热打铁,来看同样拥有“暂停”能力的两个方法:
- Thread.yield
- LockSupport.park
Thread.yield
首先是方法声明:
public static native void yield();
还是熟悉的JNI方法。同样从注释开始:
A hint to the scheduler that the current thread is willing to yield its current use of a processor. The scheduler is free to ignore this hint.
这句话很好理解,提示调度器当前线程可以放弃处理器时间,但是调度器可以忽略。直接来看JVM实现:
JVM_ENTRY(void, JVM_Yield(JNIEnv *env, jclass threadClass))
if (os::dont_yield()) {
return;
}
os::naked_yield();
JVM_END
是不是很熟悉?和我们在Thread.sleep
中看到millis == 0
的场景不能说相似吧,简直是一模一样。强调一下,Thread.yield只是暂时让出CPU时间,并不是不再执行,也没有释放监视器锁。
LockSupport.park
LockSupport.park
常常会和Thread.sleep
,Thread.yield
以及Object.wait
一起比较,趁这次一起说完。
从Java源码入手:
private static final Unsafe U = Unsafe.getUnsafe();
public static void park() {
U.park(false, 0L);
}
好家伙!!!LockSupport啥也不干,直接使用大名鼎鼎的Unsafe
,那么直接分析Unsafe.park
。
在此之前,还是要先看注释:
Disables the current thread for thread scheduling purposes unless the permit is available.
翻译过来就是,未获得许可的情况下,一直暂停线程。从表象上看和Object.wait
很相似,但是别忘了Object.wait
会释放监视器锁。
Unsafe.park
依旧是方法声明:
@HotSpotIntrinsicCandidate
public native void park(boolean isAbsolute, long time);
Tips:@HotSpotIntrinsicCandidate
是Java 9中引入的,表示方法在HotSpot虚拟机中有高效的实现。
Unsfae.java的方法是直接在unsafe.cpp中注册的,实现也在unsafe.cpp中:
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
thread->parker()->park(isAbsolute != 0, time);
} UNSAFE_END
需要注意,Thread.sleep
中使用的是os::PlatformEvent::park
,这里调用的是Parker::park
,在os_posix.cpp中:
void Parker::park(bool isAbsolute, jlong time) {
if (time == 0) {
_cur_index = REL_INDEX;
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
}
}
Parker::park
提供了两种场景,暂停指定时间依赖于pthread_cond_timedwait
实现,对应LockSupport.parkNanos
,不限时暂停依赖于pthread_cond_wait
实现,对应LockSupport.park
。
从源码来看,Thread.sleep
中使用的os::PlatformEvent::park
是简化版的Parker::park
。另外,我们也可以得到一个隐藏结论:LockSupport.park并不会释放监视器锁。
Thread.join
先来看Java中关于join(long millis)
的注释:
Waits at most millis milliseconds for this thread to die. A timeout of 0 means to wait forever.
比较容易翻译,等待指定的时间,或调用线程运行结束。如果指定时间为0,则会永远等待。看起来又是关于线程“暂停”的方法了,我们来看源码:
public final synchronized void join(final long millis) throw InterruptedException {
if (millis > 0) {
if (isAlive()) {
final long startTime = System.nanoTime();
long delay = millis;
do {
wait(delay);
} while (isAlive() && (delay = millis - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
}
} else if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
throw new IllegalArgumentException("timeout value is negative");
}
}
逻辑很清晰,也没有调用太多JNI方法。看起来岁月静好,不过,我们先写一段测试代码:
public class JoinThread extends Thread{
private Thread joinThread;
@Override
public void run() {
System.out.println("[join测试]线程:[" + Thread.currentThread().getName() + "]进入!");
if(this.joinThread != null) {
System.out.println("[join测试]线程:[" + Thread.currentThread().getName() + "]准备执行join!");
try {
this.joinThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("[join测试]线程:[" + Thread.currentThread().getName() + "]结束!");
}
}
public class JoinDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("[join测试]线程:[" + Thread.currentThread().getName() + "]执行!");
Thread t1 = new JoinThread();
Thread t2 = new JoinThread(t1);
t1.start();
t2.start();
System.out.println("[join测试]线程:[" + Thread.currentThread().getName() + "]结束!");
}
}
现在我们提出两个问题:
- 谁在等待t1运行结束?
- 什么时候唤醒的线程?
对于第一个问题,我们先来回顾下Object.wait
的使用。ObjectMonitor::wait
中调用os::PlatformEvent::park
,操作的对象是当前执行线程,而不是调用对象。
Tips:这里有些绕,this.joinThread.join()
的调用中,this.joinThread
是线程对象,而不是执行线程,执行线程是Thread实例对象在操作系统层面的映射。
网上很多答案说,join方法阻塞的是主线程并不准确,个人理解在哪个线程中执行join方法(不是调用!!!),就阻塞哪个线程。举个例子:
Thread t1 = new Thread(() -> {
System.out.println("线程t1执行!");
});
Thread t2 = new Thread(() -> {
try {
t1.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("线程t2执行!");
});
t1.start();
t2.start();
这种情况下被阻塞的是线程实例对象t2在操作系统层面映射的执行线程。
接着我们来看第二个问题,在Thread.join
的源码中,我们并没有看到notify/notifyAll方法,那么线程怎么被唤醒的呢?
这里直接给出答案,以上面的代码为例子,在线程t1执行结束时,JVM会唤醒等待的线程。也就是JVM层面执行JavaThread::exit
时唤醒线程,源代码在thread.cpp中:
void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
// Notify waiters on thread object. This has to be done after exit() is called
// on the thread (if the thread is the last thread in a daemon ThreadGroup the
// group should have the destroyed bit set before waiters are notified).
ensure_join(this);
}
通过JVM的注释也能看出这个方法做了什么,不过我们还是一起来看ensure_join
方法,在thread.cpp中:
static void ensure_join(JavaThread* thread) {
ObjectLocker lock(threadObj, thread);
thread->clear_pending_exception();
java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
java_lang_Thread::set_thread(threadObj(), NULL);
lock.notify_all(thread);
}
代码最后一行,调用了notify_all
唤醒了所有线程,也就是说,此刻所有调用Object.wait
的方法都会被唤醒。另外也可以看到,线程状态被标记为TERMINATED也是在这个方法中完成的。
到此为止,Thread.join
的原理也已经说完了,它的本质就是调用Object.wait
实现阻塞,因此Java的注释中也会建议不要使用wait/notify/notifyAll:
It is recommended that applications not use wait, notify, or notifyAll on Thread instances.
Thread.interrupt
从关键代码开始:
public void interrupt() {
interrupt0();
}
如果没猜错的话,interrupt0
依旧是JNI方法:
private native void interrupt0();
往下追之前,来看注释:
Interrupts this thread.
简明扼要,中断线程。这时候相信你已经能够熟练的点开jvm.cpp了查看源码了:
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
ThreadsListHandle tlh(thread);
JavaThread* receiver = NULL;
bool is_alive = tlh.cv_internal_thread_to_JavaThread(jthread, &receiver, NULL);
if (is_alive) {
Thread::interrupt(receiver);
}
JVM_END
跳过thread.cpp,直接来到os::is_interrupted
方法,在os_posix.cpp中:
void os::interrupt(Thread* thread) {
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
// 标记线程为中断状态
osthread->set_interrupted(true);
OrderAccess::fence();
// 唤醒_SleepEvent上的线程
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL)
slp->unpark() ;
}
// 唤醒Parker上的线程
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
// 唤醒_ParkEvent上的线程
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL)
ev->unpark() ;
}
调用Thread.interrupt在JVM层面并没有立即停止线程,仅标记了中断状态,随后尝试唤醒处于sleep/wait/park的线程,真正的中断是从操作系统获取该线程的中断状态开始的。
结语
今天我们一起了解了Thread类中的6个方法,另外也学习了Object.wait
,Object.notify
,LockSupport.park
和Unsafe.park
,虽然没有提及Object.notifyAll
,但它的原理和Object.notify
完全一样,只不过多了一层循环。
最后我们再通过一张表格,来对比下线程“暂停”方法:
当然了,“暂停”的方式不仅仅有这些,还有一些会在JUC中涉及。
好了,今天就到这里了,Bye~~