并发编程基础ReentrantLock源码分析

简介: 并发编程基础ReentrantLock源码分析

并发基础ReentrantLock分析

基础知识

ReentrantLock分为公平锁和非公平锁。

通过lock获取锁,unLock释放锁。

Note1:(双向链表。初始的时候head和tail都指向Null,之后添加新节点的时候会创建一个空Node,head和tail都指向这个空Node代表初始化完成。

尾插法入队通过移动尾结点tail往前移动,出队通过head节点往前移动。当Head节点和tail指向同一个Node时代表没有阻塞队列中没有等待的线程)

之后唤醒的时候也会通过tail尾结点一直像后遍历拿到真正处于等待状态的Node线程,然后通过LockSupport提供的Unpark指定唤醒Node线程;

阻塞的时候如果被唤醒也是会看前一个节点是不是Head节点(Head节点一直是Null)如果是才会进行tryAcquire(),这样做大概是因为如果都唤醒的话也可以按照阻塞队列的顺序唤醒。

Fair 进入AQS的acquire中. state用于记录锁的状态:(0代表没有线程拿着锁 , 1代表有线程拿着锁 2代表拿着线程的锁又一次获取了锁也就是重入了这个锁)

NonFair 会首先进行一次cas修改state状态直接拿锁。,失败后才会进入到AQS中acquire的逻辑中

NonFair的lock

 final void lock() {
  if (compareAndSetState(0, 1)) //直接修改state,失败才会进入到AQS的acquire逻辑中
        setExclusiveOwnerThread(Thread.currentThread()); //修改state成功代表当前没有线程持有锁,可以直接获取;
      //并且代表设置记录当前线程(之后重入做准备)
  else{
      acquire(1);
    }

Fair的lock

final void lock() {
  acquire(1);//不尝试直接拿锁,直接进去AQS的acquire逻辑中
}

AQS的acquire

AQS的acquire,acquireInterruptibly(区别是 是否会立即响应中断还是拿到锁运行的时候在响应);

AQS的acquire源码

   public final void acquire(int arg) {
       if (!tryAcquire(arg) &&   //teyAcquire在NonFair和Fair中有不同的逻辑。代表尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ //addWaiter用来将当前线程封装为Node添加到AQS的阻塞队列中。之后acquireQueued是阻塞线程当线程唤醒并且拿到锁后检查阻塞过程中是否中断过,
         //如果发生过中断acquireQueue返回true,没有发生过中断返回false。selfInterrupt中进行补偿处理
        selfInterrupt()
 }

selfInterrupt源码

 static void selfInterrupt() {
    Thread.currentThread().interrupt(); //进行补偿
  }

代码解释:

一,.首先会尝试进行tryAcquire,tryAcquire根据Faire和NonFair获取锁的逻辑也是不一样的。

1.1 NonFair

首先会检查锁是否已经被线程获取过(state>0):如果获取过检查获取锁的这个线程是不是当前线程:如果是cas修改state值+1返回true(可重入);如果不是当前线程代表获取失败返回false;

如果这个锁还没有被其他线程获取cas修改状态为1代表这个线程获取过一次锁了,接着将获取锁的这个线程保存起来(后面冲入的时候要判断线程是不是已经获取过了)

1.2 Fair

首先会检查锁是否已经被线程获取过(state>0),如果获取过检查获取锁的这个线程是不是当前线程:如果是cas修改state值+1返回true;如果不是当前线程代表获取失败返回false。

如果这个锁还没有被其他线程获取,还要检查一下当前是不是在AQS中阻塞队列的头部。如果不是在头部也是会返回false的

只有在阻塞队列头部的线程才会cas修改状态为1代表这个线程获取过一次锁了,接着将获取锁的这个线程保存起来(后面冲入的时候要判断线程是不是已经获取过了)

NonFair的tryAcquire源码:

final boolean nonFairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
       int c = getState();
        if (c == 0) {
          if (compareAndSetState(0, acquires)) {//如果没有线程拿着锁,直接尝试修改state
                setExclusiveOwnerThread(current);//修改成功代表获取锁成功,并且记录保存当前线程
             return true;
             }
          } else if (current == getExclusiveOwnerThread()) {//代表锁已经被线程拿着了,在判断是不是当前线程拿过如果拿过代表可以重入state加一。
            int nextc = c + acquires; //修改state+1 代表重入此时为state2
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                  setState(nextc);//设置state状态。
                  return true; //返回true代表获取锁成功
            }
             //代表被其他线程拿着锁返回false代表获取锁失败
            return false;
}

FairTyrAcquire源码:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (!hasQueuedPredecessors() &&
         compareAndSetState(0, acquires)) {  //这个条件要比NonFair更加严格,如果没有线程拿着锁,还要判断当前线程是不是在AQS的阻塞队列头部。如果在队列头部才可以进行修改state值,否则不能修改。注意:这也是公平锁和非公平锁的不同
          setExclusiveOwnerThread(current);//体现公平,按照阻塞队列顺序
           return true;
        }
     }else if (current == getExclusiveOwnerThread()) {//如果是已经获取过锁的线程直接可以进行重入修改状态
           int nextc = c + acquires;
           if (nextc < 0)
                  throw new Error("Maximum lock count exceeded");
             setState(nextc);
             return true;
            }
    return false;
}

二,如果尝试获取锁成功直接返回,线程可以继续运行

(对于NonFair来说第一种情况是当前没有线程拿着锁,第二种情况是冲入。对于Fair来说第一种情况是冲入第二种情况是没有线程拿着所并且当前线程在阻塞队列头部)

2.1

如果尝试获取锁失败则会调用acquireQuened(addWaiter())。addWaiter()首先将当前线程封装成一个Node,之后通过cas操作来添加到AQS中的阻塞队列里面(查看Note1)

2.2

addWaiter()用于将线程添加到AQS的阻塞队列中,接下来的acquireQueued就是对线程进行阻塞了

这里的阻塞和唤醒是指用了UnSafe类的park和unPark,unPark可以唤醒指定线程,而不是用Object的wait和notiify来实现。

addWaiter源码:(返回添加后的Node)

/**
     * Creates and enqueues node for current thread and given mode. *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
      */
        //将当前线程封装为Node不断使用cas来将tail节点指向当前Node。也就是入队操作
 private Node addWaiter(Node mode) {
     Node node = new Node(mode);
       for (;;) {
           Node oldTail = tail;
           if (oldTail != null) {
               U.putObject(node, Node.PREV, oldTail);
              if (compareAndSetTail(oldTail, node)) {
                      oldTail.next = node;
                       return node;
                  }
           } else {
             initializeSyncQueue();
           }
      }
}
/**
 * Initializes head and tail fields on first contention.
  */初始化尾结点tail
private final void initializeSyncQueue() {
   Node h;
  if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
      tail = h;
}
/**
* CASes tail field.
*///不断设置尾结点tail指向当前Node
 private final boolean compareAndSetTail(Node expect, Node update) {
  return U.compareAndSwapObject(this, TAIL, expect, update);
}

2.3

在进行阻塞之前还是会进行尝试一次获取锁的逻辑,如果没有获取到锁那么调用park自己阻塞自己。

注意:由于park的阻塞除了unPark会唤醒外,interrupt也会唤醒。这个时候就会根据是否会立马响应中断还是来进行处理。

acquireQuened源码

//上一步addWaiter代表添加到阻塞队列中,这一步该阻塞线程了。
  final boolean acquireQueued(final Node node, int arg) {
        try {
        //              中断标志位由于acquire在阻塞过程中是不会立即响应interrupt的,所以需要标识记录。当线程获取到锁的时候在进行补偿处理
          boolean interrupted = false;
        //              死循环目的:由于park是会被interrupt唤醒的,所以需要检查如果不是unPark唤醒的需要继续阻塞。
            for (;;) {
        //                取出前一个节点观察是不是头结点(由于Head指向的是一个空节点Node。所以前一个节点是Head就代表当前Node就是第一个要出队的Node),如果是头结点代表可以进行获取锁的逻辑。
       final Node p = node.predecessor();
             if (p == head && tryAcquire(arg)) {//当前位于AQS阻塞队列头部,并且获取锁成功(tryAcquire上面讲解过)。设置当前为头结点接着返回中断标志位进行补偿处理中断操作
                    setHead(node);
                    p.next = null; // help GC
                     return interrupted;
                }
        //                  //如果当前节点不是头部,
                 if (shouldParkAfterFailedAcquire(p, node) &&
                           parkAndCheckInterrupt())  //阻塞自己
                      interrupted = true; //如果满足这个条件代表是interrupt唤醒的,需要记录标志位。接着再次进入到死循环中
                   }
             } catch (Throwable t) {
                   cancelAcquire(node);
                    throw t;
              }
  }
  private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this);  //park自己阻塞自己,park只有两种情况会被唤醒   第一种由于unPark如果是被这种方式唤醒的那么就该进入到获取锁的逻辑中;还有一种情况是由于interrupt导致的唤醒这种情况下根据是否立即响应还是拿到锁之后再响应分为两种方法  acquire和acquireInterruptibly
      return Thread.interrupted();
   }

2.4

默认是不会立马响应中断的,当park被唤醒后,将会检查线程的中断标志位并保存起来,然后在进行调用park阻塞自己。直到线程是真正由于unPark唤醒的时候再取出之前的标志位看阻塞期间是否有中断然后在抛出异常响应中断。

2.5

但是如果是立马响应中断的话,当park被唤醒后检查中断标志位如果是中断导致的直接抛出异常,并且退出阻塞的队列不在尝试获取锁;如果不是中断导致的则再次尝试获取锁的逻辑也就是acquireQueue的逻辑

2.4和2.5分别对应acquire,acquireInterruptibly

区别只是在阻塞期间如果是由于中断导致的唤醒,

一个是保存起来标志位等到获取到锁的时候在进行响应中断异常;

一个是立马抛出异常不等到拿锁在进行处理。

unLock

unLock 不区分公平锁和非公平锁 逻辑都一样都是对state减一,然后看下是不是state变为0(意味着该线程不再持有锁了)然后将阻塞队列中的头结点进行唤醒

public void unlock() {
   sync.release(1);
}

release是在AQS中的方法

release源码

public final boolean release(int arg) {
     if (tryRelease(arg)) {  //尝试释放锁,注意:只有已经持有锁的线程才可以进行释放锁;没有持有锁的话释放没用..。该条件成立代表该线程已经不在持有锁了所以需要唤醒其他线程
          Node h = head;  取出头结点
          if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);  //准备唤醒处于AQS阻塞队列中的头结点
               return true;
        }
            return false;
  }

tryRelease源码

protected final boolean tryRelease(int releases) {
        int c = getState() - releases;  //调用一次unlock也就是调用一次Sync的release也就是进行一次锁的递减。因此同一个lock重入多少次就要unlock多少次
        if (Thread.currentThread() != getExclusiveOwnerThread())
             throw new IllegalMonitorStateException();
          boolean free = false;
           if (c == 0) {
                 //当前线程不再持有锁,需要唤醒阻塞队列中的阻塞线程设置free代表该现场已经不在持有锁了,然后清空拥有锁的线程标志位
              free = true;
              setExclusiveOwnerThread(null);
          }
          setState(c);//由于只有持有锁的线程才可以进行tryRelease,又因为是排它锁,所以这里面是线程安全的。
         return free;  //返回free、只有当没有重入state等于0才可以返回true也才可以进行唤醒其他线程。如果一个线程重入了两次,只调用了一次unlock那么是不会唤醒其他线程的
}

unparkSuccessor源码

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
        if (ws < 0)
             node.compareAndSetWaitStatus(ws, 0);
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
          * traverse backwards from tail to find the actual
           * non-cancelled successor.
             */
            Node s = node.next;
          if (s == null || s.waitStatus > 0) {
              s = null;
               for (Node p = tail; p != node && p != null; p = p.prev)
                  if (p.waitStatus <= 0)
                       s = p;
            }
             if (s != null)
                  LockSupport.unpark(s.thread);  //唤醒线程
 }


目录
相关文章
|
6月前
|
安全 Java
Java并发编程:Synchronized及其实现原理
Java并发编程:Synchronized及其实现原理
53 4
|
6月前
|
Java
深入理解Java中的AbstractQueuedSynchronizer(AQS):并发编程的核心组件
深入理解Java中的AbstractQueuedSynchronizer(AQS):并发编程的核心组件
|
2月前
|
Java
JAVA并发编程ReentrantLock核心原理剖析
本文介绍了Java并发编程中ReentrantLock的重要性和优势,详细解析了其原理及源码实现。ReentrantLock作为一种可重入锁,弥补了synchronized的不足,如支持公平锁与非公平锁、响应中断等。文章通过源码分析,展示了ReentrantLock如何基于AQS实现公平锁和非公平锁,并解释了两者的具体实现过程。
|
6月前
|
安全 Java 程序员
Java多线程基础-17:简单介绍一下JUC中的 ReentrantLock
ReentrantLock是Java并发包中的可重入互斥锁,与`synchronized`类似但更灵活。
56 0
|
6月前
|
安全 Java
多线程(进阶三:JUC)
多线程(进阶三:JUC)
71 0
|
存储 安全 算法
Java并发编程之ConcurrentHashMap源码分析
HashMap多线程put后get为null和多线程put的时候可能导致元素丢失 在多线程环境下,使用HashMap进行put操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈建议使用ConcurrentHashMap代替HashMap
250 0
Java并发编程之ConcurrentHashMap源码分析
|
存储 缓存 安全
【Java并发编程 三】Java并发机制的底层实现(一)
【Java并发编程 三】Java并发机制的底层实现(一)
115 0
|
存储 缓存 安全
【Java并发编程 三】Java并发机制的底层实现(二)
【Java并发编程 三】Java并发机制的底层实现(二)
61 0
|
安全 Java
java并发原理实战(9)--手动实现一个可重入锁
java并发原理实战(9)--手动实现一个可重入锁
123 0
java并发原理实战(9)--手动实现一个可重入锁
|
存储 安全 Java
java并发原理实战(5)--线程安全性问题和synchronized原理理解
java并发原理实战(5)--线程安全性问题和synchronized原理理解
java并发原理实战(5)--线程安全性问题和synchronized原理理解