ReentranLock源码学习

简介: 线程的三大特性:原子性、可见性、有序性。也就是说满足这个三个特性的操作都是可以保证安全的,如Atomic包、volatile、通过happensBefore原则可以进行线程的安全的判断,这个依据通常是为了避免jvm指令重排。比如通常我们知道的配置信息,如果有多个线程去进行配置信息的修改,则需要进行上锁。或者多个线程修改一个变量时,此时就需要进行上锁了,或者读写分离时,可以考虑ReentrantReadWriteLock等。其本质是解决并行中的问题,将并行转成串行问题进行解决。那怎么上锁才有用呢?锁的状态大部分情况下是互斥的。当然也有特例:ReentrantReadWriteLock的读读是不会

首先回答一个问题?线程的三大特性?什么时候我们需要锁?java中已经提供了synchronized,为什么还要使用ReentrantLock?AQS原理。

线程的三大特性:原子性、可见性、有序性。也就是说满足这个三个特性的操作都是可以保证安全的,如Atomic包、volatile、通过happensBefore原则可以进行线程的安全的判断,这个依据通常是为了避免jvm指令重排。比如通常我们知道的配置信息,如果有多个线程去进行配置信息的修改,则需要进行上锁。或者多个线程修改一个变量时,此时就需要进行上锁了,或者读写分离时,可以考虑ReentrantReadWriteLock等。其本质是解决并行中的问题,将并行转成串行问题进行解决。那怎么上锁才有用呢?锁的状态大部分情况下是互斥的。当然也有特例:ReentrantReadWriteLock的读读是不会互斥的,其读写,写写实互斥的,当然可重入锁执行一个线程调用另外一个线程也不会互斥。之所以使用RenntranLock,是因为它适用于并发场景较为激烈的情况,同时其是经过优化了的。当然synchronized自JDK1.6之后也进行了优化,将其分为了偏向锁、轻量级锁、重量级锁。

同时ReentrantLock是基于AQS(AbstractQueuedSynchronizer)实现的,其目前也是唯一实现lock接口的可重入锁。其优点在于将锁进行细化,将锁分为两种锁,公平锁和非公平锁,也即独占锁与抢占锁。当进入公平锁时,是直接返回获取锁成功的,而没有获取锁时,首先会将其封装成node,放入到addWaiter中,进行阻塞,等待上一个线程完成,在进行请求,如果上一个线程完成了,则进行状态的waitStatus的变化,将其变成可执行状态,进行操作。再进行锁的获取。同时Condition采用await和singnal的方式,当然也是将其封装到队列中,进行唤醒队列。调用 Condition 的 await()方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回时,当前线程一定获取了 Condition 相关联的锁。调用 Condition 的 signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。


一、AQS数据结构及变量

//Node数据结构//FIFO的双向链表,每个数据结构都有两个指针,//分别指向后继节点和前驱节点。//每个node都是由线程封装的,当线程抢占锁失败//后会封装成node加入到AQS队列中去,当获取锁的线程释放锁以后,会从//队列中唤醒一个阻塞的节点(线程)staticfinalclassNode {
//waitStatus的5种状态:CANCELLED=1、//SIGNAL=-1、CONDITION=-2、//PROPAGATE=-3、0:默认状态//CANCELLED=1,结束状态,进入该状态后的节点将不会再变化staticfinalintCANCELLED=1;
//SIGNAL=-1,只要前置节点释放锁,就会通知标识为SIGNAL状态的后续节点的线程staticfinalintSIGNAL=-1;
/** waitStatus value to indicate thread is waiting on condition *///一个线程通信工具类似于synchronized的wait/notify//可以使某些线程一起等待某个条件(condition),//只有满足条件时,线程才会被唤醒//主要有两个值得关注的:await、signalstaticfinalintCONDITION=-2;
/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*///共享模式下,PROPAGATE状态的线程处于可运行状态staticfinalintPROPAGATE=-3;
volatileintwaitStatus;
volatileNodeprev; //前驱节点volatileNodenext;  //后驱节点volatileThreadthread;  //当前线程NodenextWaiter; //存储在condition队列中的后继节点//是否为共享锁finalbooleanisShared() {
returnnextWaiter==SHARED;
   }
finalNodepredecessor() throwsNullPointerException {
Nodep=prev;
if (p==null)
thrownewNullPointerException();
elsereturnp;
    }
Node() {    // Used to establish initial head or SHARED marker    }
//addWaiter中的信息Node(Threadthread, Nodemode) {     // Used by addWaiterthis.nextWaiter=mode;
this.thread=thread;
    }
//通常condition中包含的信息Node(Threadthread, intwaitStatus) { // Used by Conditionthis.waitStatus=waitStatus;
this.thread=thread;
    }
}
//头结点privatetransientvolatileNodehead;
//尾节点privatetransientvolatileNodetail;
//CAS中的属性,取0或者大于0,其中0表示无锁状态,//>0表示已经有线程获得锁,state可以递增,也即重入的次数privatevolatileintstate;


二、方法

获取锁

staticfinalclassNonfairSyncextendsSync {
//锁分为公平锁fairSync和非公平锁NonFairSync,我们可以//知道synchronized是公平锁,也就是fairSync//而公平锁是独占锁,因此可以知道synchronized是独占锁//而非公平锁为抢占锁。不管有没有线程排队,上来cas去抢占一下锁//cas成功,则表示成功获取锁,进行成功返回//否者cas失败,调用acquire(1)走锁竞争逻辑//其中cas调用底层的unsafe.compareAndSwapInt(this,stateOffset, expect, update);//进行更新操作,同时由于操作是原子性操作,因此不会出现线程安全问题//state=0,表示无锁状态//state>0时,也就是为1时,说明有线程获得了锁。//由于ReentrantLock允许重入,因此同一个线程多次获取同步锁的时候,state会递增,比如重入5次//namstate为5,同时需要释放5次,其他线程才可以获取锁。finalvoidlock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else//进入非公平锁逻辑,重点关注acquire(1);
    }
//尝试获取锁,如果成功返回true,// 不成功返回false,它是重写AQS队列//类中的tryAcquire方法protectedfinalbooleantryAcquire(intacquires) {
returnnonfairTryAcquire(acquires);
    }
}
//执行cas操作,调用unsafe下的compareAndSwapInt:当前的值,偏移量、期望值、更新值//同时注意偏移量是2的次幂protectedfinalbooleancompareAndSetState(intexpect, intupdate) {
returnunsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
publicfinalnativebooleancompareAndSwapInt(Objectvar1, longvar2, intvar4, intvar5);
//进入非公平所,抢占锁逻辑//传入1是为了通过tryAcquire获取抢占锁,//如果成功返回true,否则返回false//如果tryAcquire失败,则会通过//addWaiter方法将当前线程封装成Node添加 到AQS队列队尾//acquireQueued,将node作为参数,通过自旋去尝试获取锁publicfinalvoidacquire(intarg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protectedbooleantryAcquire(intarg) {
thrownewUnsupportedOperationException();
}
//通过addWriter方法会把线程添加到链表中,//接着会把node作为参数传递给acquireQueued方法,去竞争锁//将node作为参数,通过自旋去尝试获取锁finalbooleanacquireQueued(finalNodenode, intarg) {
booleanfailed=true;
try {
//不进行中断booleaninterrupted=false;
//进行自旋for (;;) {
//获取当前节点的prev节点finalNodep=node.predecessor();
//如果是head节点,说明有资格去争抢锁if (p==head&&tryAcquire(arg)) {
//获取锁成功,说明前一个线程已经释放锁,//然后设置head为当前线程执行权限setHead(node);
//把原来head节点从链表中移除p.next=null; // help GCfailed=false;
returninterrupted;
                }
//前一个线程还没有释放锁,// 使得当前线程在执行tryAcquire时返回falseif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted=true;
            }
        } finally {
//通过 cancelAcquire 取消获得锁的操作if (failed)
cancelAcquire(node);
        }
    }
finalNodepredecessor() throwsNullPointerException {
Nodep=prev;
if (p==null)
thrownewNullPointerException();
elsereturnp;
        }
//如果前一个线程还没有释放,此时当前线程和下一个线程都会来争抢锁会失败//那么失败以后会调用shouldParkAfterFailedAcquire方法//node中waitStatus有5种状态CANCELLEDprivatestaticbooleanshouldParkAfterFailedAcquire(Nodepred, Nodenode) {
//拿到前置节点的等待状态intws=pred.waitStatus;
//如果状态等于SIGNAL,// 只要前置节点释放锁,//就会通知标识为SIGNAL状态的后续节点的线程if (ws==Node.SIGNAL)
/** This node has already set status asking a release* to signal it, so it can safely park.*/returntrue;
//如果ws>0,则说明处于CANCELLED(1)的状态,说明CANCELLLED//在同步队列中等待的线程等待超时或被中断,//需要从同步队列中取消该Node的节点,处于结束状态if (ws>0) {
/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do { //采用循环从队列中移除CANCELLED的节点node.prev=pred=pred.prev;
            } while (pred.waitStatus>0);
pred.next=node;
        } else {  //否者只有两种状态默认状态0或者PROPAGATE//也即初始化状态或者处于可执行状态,利用 cas// 设置 prev 节点的状态为 SIGNAL(-1)/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
returnfalse;
    }
privatefinalbooleanparkAndCheckInterrupt() {
//挂起当前线程变成WATING状态//park方法等待许可,unpark方法为线程提供许可LockSupport.park(this);
//返回当前线程是否被其它线程触发// 过中断请求,如果有触发过中断请求,// 则返回当前的中断标识true//并且对中断标识进行复位标识已经响应过了中断请求//如果返回true,则意味着在acquire方法中会执行selfInterrput()//因为线程在调用acquireQueued方法的时候是不会响应中断请求的returnThread.interrupted();
    }
publicstaticvoidpark(Objectblocker) {
Threadt=Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
    }
finalbooleanacquireQueued(finalNodenode, intarg) {
  booleanfailed=true;
  try {
    booleaninterrupted=false;
    for (;;) {
      finalNodep=node.predecessor();//获取当前节点的 prev 节点      if (p==head&&tryAcquire(arg)) {//如果是 head 节点,说明有资格去争抢锁        setHead(node);//获取锁成功,也就是ThreadA 已经释放了锁,然后设置 head 为 ThreadB 获得执行权限        p.next=null; //把原 head 节点从链表中移除        failed=false;
        returninterrupted;
      }
      //ThreadA 可能还没释放锁,使得 ThreadB 在执行 tryAcquire 时会返回 false      if (shouldParkAfterFailedAcquire(p,node) &&parkAndCheckInterrupt())
        interrupted=true; //并且返回当前线程在等待过程中有没有中断过。    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

释放锁

publicvoidunlock() {
sync.release(1);
}
//进行unlock会调用release方法释放锁publicfinalbooleanrelease(intarg) {
//如果释放锁成功if (tryRelease(arg)) {
//拿到AQS的head节点Nodeh=head;
//如果头结点不为空,同时等待状态不为0//调用unparkSUcessor方法唤醒后续节点if (h!=null&&h.waitStatus!=0)
unparkSuccessor(h);
returntrue;
        }
returnfalse;
    }
protectedbooleantryRelease(intarg) {
thrownewUnsupportedOperationException();
    }
//唤醒节点的后继节点(如果存在)。privatevoidunparkSuccessor(Nodenode) {
/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*///获取head节点的状态intws=node.waitStatus;
//如果等待状态<0,则设置head节点的状态为0if (ws<0)
compareAndSetWaitStatus(node, ws, 0);
/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*///得到head节点的下一个节点Nodes=node.next;
//如果下一个节点为null或者status>0表示canacelled状态//通过从尾部节点开始扫描,找到距离head最近的一个waitStatus<=0的节点if (s==null||s.waitStatus>0) {
s=null;
for (Nodet=tail; t!=null&&t!=node; t=t.prev)
if (t.waitStatus<=0)
s=t;
        }
//next节点不为空,直接唤醒这个线程即可if (s!=null)
LockSupport.unpark(s.thread);
    }
privatestaticfinalbooleancompareAndSetWaitStatus(Nodenode,
intexpect,
intupdate) {
returnunsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
    }

Condition

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁

释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程

await:

publicfinalvoidawait() throwsInterruptedException {
    if (Thread.interrupted())
      thrownewInterruptedException();
    Nodenode=addConditionWaiter(); //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表    intsavedState=fullyRelease(node); //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程    intinterruptMode=0;
    //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞    while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了      LockSupport.park(this); // 第一次总是 park 自己,开始阻塞等待      // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上.      // isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了,就继续阻塞.      // isOnSyncQueue 判断当前 node 还在队列上且不是 CONDITION 状态了,就结束循环和阻塞.      if ((interruptMode=checkInterruptWhileWaiting(node)) !=0)
        break;
    }
    // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.    // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.    // 将这个变量设置成 REINTERRUPT.    if (acquireQueued(node, savedState) &&interruptMode!=THROW_IE)
      interruptMode=REINTERRUPT;
    // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.    // 如果是 null ,就没有什么好清理的了.    if (node.nextWaiter!=null) // clean up if cancelled      unlinkCancelledWaiters();
    // 如果线程被中断了,需要抛出异常.或者什么都不做    if (interruptMode!=0)
      reportInterruptAfterWait(interruptMode);
}

signal

publicfinalvoidsignal() {
  if (!isHeldExclusively()) //先判断当前线程是否获得了锁    thrownewIllegalMonitorStateException();
  Nodefirst=firstWaiter; // 拿到 Condition 队列上第一个节点  if (first!=null)
    doSignal(first);
}
privatevoiddoSignal(Nodefirst) {
  do {
    if ( (firstWaiter=first.nextWaiter) ==null)// 如果第一个节点的下一个节点是 null, 那么, 最后一个节点也是 null.      lastWaiter=null; // 将 next 节点设置成 null      first.nextWaiter=null;
  } while (!transferForSignal(first) && (first=firstWaiter) !=null);
  }
finalbooleantransferForSignal(Nodenode) {
  /*  * If cannot change waitStatus, the node has been cancelled.  */  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    returnfalse;
  Nodep=enq(node);
  intws=p.waitStatus;
  // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞),  if (ws>0||!compareAndSetWaitStatus(p, ws,Node.SIGNAL))
    LockSupport.unpark(node.thread); // 唤醒输入节点上的线程.  returntrue;
}
finalbooleantransferForSignal(Nodenode) {
  /*  * If cannot change waitStatus, the node has been cancelled.*/  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    returnfalse;
  Nodep=enq(node);
  intws=p.waitStatus;
  // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞),  if (ws>0||!compareAndSetWaitStatus(p, ws,Node.SIGNAL))
    LockSupport.unpark(node.thread); // 唤醒输入节点上的线程.  returntrue;
}
目录
相关文章
|
3月前
|
NoSQL Java 应用服务中间件
关于阅读源码
【1月更文挑战第12天】关于阅读源码
|
6月前
|
程序员 开发工具 C++
|
8月前
|
消息中间件 网络协议 Java
eventMesh源码学习
eventMesh源码学习
134 0
|
设计模式 分布式计算 资源调度
如何阅读源码
如何阅读源码
166 0
|
存储 人工智能 安全
C++学习必备——文章中含有源码
C++学习必备——文章中含有源码
99 0
C++学习必备——文章中含有源码
|
算法 Java 中间件
阅读优秀项目源码很重要,分享一个读源码的方法,小白都能学会
作为一个程序员,经常需要读一些开源项目的源码。同时呢,读源码对我们也有很多好处: 1.提升自己 2.修复 Bug 3.增加新功能
479 0
阅读优秀项目源码很重要,分享一个读源码的方法,小白都能学会
|
算法 NoSQL 前端开发
为什么要看源码、如何看源码,高手进阶必看
为什么要看源码、如何看源码,高手进阶必看
212 0
openFrameworks下的肤色检测源码
openFrameworks下的肤色检测源码
142 0
|
分布式计算 搜索推荐 前端开发
学会阅读源码后,我觉得自己better了
学会阅读源码后,我觉得自己better了
151 0
|
数据采集 JavaScript Go
colly源码学习
colly源码学习   colly是一个golang写的网络爬虫。它使用起来非常顺手。看了一下它的源码,质量也是非常好的。本文就阅读一下它的源码。 使用示例 func main() { c := colly.
1601 0

相关实验场景

更多