void Parker::park(bool isAbsolute, jlong time) { // Ideally we'd do something useful while spinning, such // as calling unpackTime(). // Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. if (Atomic::xchg(0, &_counter) > 0) return; // _counter > 0, 使用 xchg 指令修改为 0 返回 Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; // Optional optimization -- avoid state transitions if there's an interrupt pending. // Check interrupt before trying to wait if (Thread::is_interrupted(thread, false)) { // 如果线程处于中断状态,直接返回 return; } // Next, demultiplex/decode time arguments timespec absTime; if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } if (time > 0) { unpackTime(&absTime, isAbsolute, time); } // Enter safepoint region // Beware of deadlocks such as 6317397. // The per-thread Parker:: mutex is a classic leaf-lock. // In particular a thread must never block on the Threads_lock while // holding the Parker:: mutex. If safepoints are pending both the // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock. ThreadBlockInVM tbivm(jt); // 构造当前线程的 ThreadBlockInVM, 为了防止死锁等特殊场景 // Don't wait if cannot get lock since interference arises from // unblocking. Also. check interrupt before trying wait if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; if (_counter > 0) { // no wait needed _counter = 0; // 重置 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; } #ifdef ASSERT // Don't catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; // 调用 safe_cond_timedwait 进入线程阻塞 status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif _counter = 0 ; // 返回后 _counter 状态位重置 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } }
unpark 方法源码分析
我们再来看看 unpark (源码文件 os_linux.cpp)。 主要是西安的流程如下:
- pthread_mutex_lock 获取锁
- _counter 设置为 1
- 判断 _counter 的旧值:
- 小于 1 时,调用 pthread_cond_signal 唤醒在 park 阻塞的线程;
- 等于 1 时,直接返回
void Parker::unpark() { int s, status ; status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; s = _counter; _counter = 1; if (s < 1) { // thread might be parked if (_cur_index != -1) { // thread is definitely parked if (WorkAroundNPTLTimedWaitHang) { status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }
Parker 和 ParkEvent 的区别
ParkEvent 其实和 Parker 的功能类似,同样也可以提供线程阻塞和唤醒的功能。 意思是说ParkEvent是用于java级别的synchronize关键字,Parker是JSR166来的并发工具集合,后面会统一使用ParkEvent。ParkerEvent 继承了PlatformEvent。基类PlatformEvent是特定于平台的,而ParkEvent则是平台无关的。Parker 继承自PlatformParker。
- ParkerEvent中的park,unpark方法用于实现Java的object.wait()方法和object.notify()方法;
- Parker中的park,unpark方法用于实现Java的Locksupprt.park()方法和Locksupprt.unpark()方法;
参考 park.hpp 的源码注释
// The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provides park(), unpark(), etc., and // is abstract -- that is, a PlatformEvent should never be instantiated except // as part of a ParkEvent. // Equivalently we could have defined a platform-independent base-class that // exported Allocate(), Release(), etc. The platform-specific class would extend // that base-class, adding park(), unpark(), etc. // // A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark. // // We'll want to eventually merge these redundant facilities and use ParkEvent.
使用案例
下面是一个先进先出的非重入方法的使用。
class FIFOMutex { private final AtomicBoolean locked = new AtomicBoolean(false); private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>(); public void lock() { boolean wasInterrupted = false; Thread current = Thread.currentThread(); waiters.add(current); // Block while not first in queue or cannot acquire lock while (waiters.peek() != current || !locked.compareAndSet(false, true)) { LockSupport.park(this); if (Thread.interrupted()) // ignore interrupts while waiting wasInterrupted = true; } waiters.remove(); if (wasInterrupted) // reassert interrupt status on exit current.interrupt(); } public void unlock() { locked.set(false); LockSupport.unpark(waiters.peek()); } }}