LockSupport 以及 park、unpark 方法源码分析(下)

简介: LockSupport 是 jsr 166 中新增的 juc 工具类。 LockSupport 类主要用于创建锁和其他同步类来实现线程阻塞。 这个类与他使用的每个线程进行关联, 如果可用就立即 park , 我们可以通过 unpack 方法进行唤醒。
void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().
  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return; // _counter > 0, 使用 xchg 指令修改为 0 返回
  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;
  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  if (Thread::is_interrupted(thread, false)) { // 如果线程处于中断状态,直接返回
    return;
  }
  // Next, demultiplex/decode time arguments
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }
  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt); // 构造当前线程的 ThreadBlockInVM, 为了防止死锁等特殊场景
  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }
  int status ;
  if (_counter > 0)  { // no wait needed
    _counter = 0; // 重置
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }
#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    // 调用 safe_cond_timedwait 进入线程阻塞
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");
#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
  _counter = 0 ; // 返回后 _counter 状态位重置
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  OrderAccess::fence();
  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}


unpark 方法源码分析


我们再来看看 unpark (源码文件 os_linux.cpp)。 主要是西安的流程如下:


  1. pthread_mutex_lock 获取锁


  1. _counter 设置为 1


  1. 判断 _counter 的旧值:


  • 小于 1 时,调用 pthread_cond_signal 唤醒在 park 阻塞的线程;


  • 等于 1 时,直接返回


void Parker::unpark() {
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  _counter = 1;
  if (s < 1) {
    // thread might be parked
    if (_cur_index != -1) {
      // thread is definitely parked
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
      }
    } else {
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else {
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}


Parker 和 ParkEvent 的区别


ParkEvent 其实和 Parker 的功能类似,同样也可以提供线程阻塞和唤醒的功能。 意思是说ParkEvent是用于java级别的synchronize关键字,Parker是JSR166来的并发工具集合,后面会统一使用ParkEvent。ParkerEvent 继承了PlatformEvent。基类PlatformEvent是特定于平台的,而ParkEvent则是平台无关的。Parker 继承自PlatformParker。


  1. ParkerEvent中的park,unpark方法用于实现Java的object.wait()方法和object.notify()方法;


  1. Parker中的park,unpark方法用于实现Java的Locksupprt.park()方法和Locksupprt.unpark()方法;


参考 park.hpp 的源码注释


// The base-class, PlatformEvent, is platform-specific while the ParkEvent is
// platform-independent.  PlatformEvent provides park(), unpark(), etc., and
// is abstract -- that is, a PlatformEvent should never be instantiated except
// as part of a ParkEvent.
// Equivalently we could have defined a platform-independent base-class that
// exported Allocate(), Release(), etc.  The platform-specific class would extend
// that base-class, adding park(), unpark(), etc.
//
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.
//
// We'll want to eventually merge these redundant facilities and use ParkEvent.


使用案例


下面是一个先进先出的非重入方法的使用。


class FIFOMutex {
    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
        = new ConcurrentLinkedQueue<Thread>();
    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);
        // Block while not first in queue or cannot acquire lock
        while (waiters.peek() != current ||
               !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) // ignore interrupts while waiting
                wasInterrupted = true;
        }
        waiters.remove();
        if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
    }
    public void unlock() {
        locked.set(false);
        LockSupport.unpark(waiters.peek());
    }
}}


参考文档









相关文章
|
安全 Java 程序员
如何理解并使用 park 与 unpark
park和unpark是Java中的两个线程同步工具,用于线程的阻塞和唤醒操作。
389 0
如何理解并使用 park 与 unpark
【多线程:Park&Unpark】
【多线程:Park&Unpark】
95 0
|
分布式计算 Spark Java
Spark2.4.0 SparkSession 源码分析
创建SparkContext new SparkSession
3277 0
|
Java 调度
LockSupport 以及 park、unpark 方法源码分析(上)
LockSupport 是 jsr 166 中新增的 juc 工具类。 LockSupport 类主要用于创建锁和其他同步类来实现线程阻塞。 这个类与他使用的每个线程进行关联, 如果可用就立即 park , 我们可以通过 unpack 方法进行唤醒。
401 0
|
存储 分布式计算 大数据
Spark中几种ShuffleWriter的区别你都知道吗?
一.前言 在Spark中有三种shuffle写,分别是BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。分别对应三种不同的shuffleHandle。
1345 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1620 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
897 0
|
分布式计算 Spark
Spark2.4.0 SparkEnv 源码分析
SparkEnv对象构建 SparkEnv类中做如下操作 ).new SecurityManager() ).new NettyRpcEnvFactory() ).创建NettyRpcEnv ).
1754 0
|
分布式计算 资源调度 调度
Spark2.4.0 SparkContext 源码分析
createSparkEnv Started SparkUI 注册端点HeartbeatReceiver createTaskScheduler 启动任务调度器,指定默认任务调度模式FIFO,构建调度池 new DAGScheduler 注册DriverEndpoint端点:Coarse...
2953 0
|
分布式计算 Spark Hadoop
Spark MapOutputTracker源码分析
## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
1693 0