文章首发于公众号【看点代码再上班】,欢迎围观,第一时间获取最新文章。
目录
大家好,我是tin,这是我的第13篇原创文章
文章首发于公众号【看点代码再上班】,欢迎围观,第一时间获取最新文章。
今天把ReentrantLock和AQS一起翻一翻,通过源码说一说我们的Java锁。
初识ReentrantLock
首先,我们同时启用5个线程对一个公共变量counter从0开始累加,每个线程只对变量+1,理想的情况是公共变量最后值等于5,类似如下结果:
thread(0) @【看点代码再上班】, thread state:RUNNABLE
thread(1) @【看点代码再上班】, thread state:WAITING
thread(2) @【看点代码再上班】, thread state:WAITING
thread(3) @【看点代码再上班】, thread state:WAITING
thread(4) @【看点代码再上班】, thread state:WAITING
counter:2
thread(0) @【看点代码再上班】, thread state:TERMINATED
thread(1) @【看点代码再上班】, thread state:RUNNABLE
thread(2) @【看点代码再上班】, thread state:WAITING
thread(3) @【看点代码再上班】, thread state:WAITING
thread(4) @【看点代码再上班】, thread state:WAITING
counter:3
thread(0) @【看点代码再上班】, thread state:TERMINATED
thread(1) @【看点代码再上班】, thread state:TERMINATED
thread(2) @【看点代码再上班】, thread state:WAITING
thread(3) @【看点代码再上班】, thread state:RUNNABLE
thread(4) @【看点代码再上班】, thread state:WAITING
counter:4
thread(0) @【看点代码再上班】, thread state:TERMINATED
thread(1) @【看点代码再上班】, thread state:TERMINATED
thread(2) @【看点代码再上班】, thread state:RUNNABLE
thread(3) @【看点代码再上班】, thread state:TERMINATED
thread(4) @【看点代码再上班】, thread state:WAITING
counter:5
thread(0) @【看点代码再上班】, thread state:TERMINATED
thread(1) @【看点代码再上班】, thread state:TERMINATED
thread(2) @【看点代码再上班】, thread state:TERMINATED
thread(3) @【看点代码再上班】, thread state:TERMINATED
thread(4) @【看点代码再上班】, thread state:RUNNABLE
我把线程的状态一起打印出来了,从图而知,对counter的累加是线程安全的,每次只有一个线程处于RUNNABLE状态,其他的线程要么处于WAITING,要么处于TERMINATED状态。
我把测试代码也截图发出来:
(ps:如要测试源码,可到此下载:https://github.com/iam-tin/tin-example/tree/master/tin-basis/src/main/java/com/tin/example/lock)
我们对ReentrantLock的初步认识大多是从lock()和unlock()方法开始的,我们用得比较多的也是此两方法(除此之外还有tryLock()等方法)。
当我们调用lock()方法的时候,ReentrantLock是如何实现共享变量单线程锁定的呢?当调用unlock()方法的时候,ReentrantLock又是如何把锁释放而通知其他WAITING的线程去获取锁的呢?
以上问题就涉及到了我们Java中大名鼎鼎的AQS了。通过源码可以看到ReentrantLock#lock()实际是调用的抽象内部类Sync的lock()方法,FairSync(公平锁)和NonfairSync(非公平锁)分别继承了Sync类:
顺藤摸瓜,找到Sync类其实是继承AbstractQueuedSynchronizer(AQS),
不管是FairSync还是NonFairSync,底层都是用AQS的能力实现。
什么是AQS
说到Java中的并发编程,一定绕不开AQS(AbstractQueuedSynchronizer),它是我们伟大的Java作者Doug Lea(我特喜欢这个老爷子)的又一大杰作!
看看官方文档是怎么描述AQS的:
AQS又称抽象队列同步器,是并发包(java.util.concurrent)的基础框架,很多我们熟悉的锁和同步组件都是依赖于AQS,比如ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。AQS底层则依靠CAS(竞争锁)与同步队列,如下图:
AQS中的同步状态state
不同线程要获取锁,AQS实际上是通过一个volatile变量来控制的,这个变量就是state,表示同步状态。和state相关的方法主要有三个方法getState(),setState(),compareAndSetState()
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
很明显,线程可以通过compareAndSetState方法(实际是CAS)获取锁,compareAndSetState被调用的地方有哪些呢?见下图:
compareAndSetState(0, 1)可以理解为线程尝试首次获取锁,compareAndSetState(c, c + acquires)可以理解为多次获取同一把锁。
CLH变体队列
CLH是一个双向链表队列,其关键数据结构为:Node。Node即为CLH变体队列中的节点,它把线程封装起来,一个节点可以理解为一个要准备竞争锁的线程。
static final class Node {
//共享模式节点标记
static final Node SHARED = new Node();
//独占模式节点标记
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
//上面四种状态值,初始值为0
volatile int waitStatus;
//前驱指针
volatile Node prev;
//后继指针
volatile Node next;
//和节点绑定的线程
volatile Thread thread;
//等待条件的下一个节点
Node nextWaiter;
}
独占模式
独占模式表示同一时间只能有一个线程占有同一把锁(和重入不一样,重入表示同一个线程可以多次获取锁),ReentrantLock就是一个独占锁(同时它也是可重入的)。我们看看独占模式获取锁的逻辑acquire()方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg)尝试获取锁,如果获取不到则把当前线程包装为一个独占模式的节点放到队列的末尾,然后acquireQueued方法把线程挂起阻塞,直到获取锁。
(acquire()对应锁释放方法是release(int arg)。)
共享模式
共享模式表示同一把锁可以被多个线程同时拥有。我们经常看到的ReadWriteLock、CountdownLatch、Semaphere都是共享模式。
和独占模式acquire()方法相对应,我们看下共享模式下acquireShared(int arg)方法是如何获取锁的。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared(arg)只是AQS定义的一个模板,具体实现都在各种锁的实现类下,tryAcquireShared(arg)返回值只要大于等于0都认为是获取锁成功。我们简单看下信号量Semaphore的tryAcquireShared实现是怎么样的:
Semaphore同样分为公平锁和非公平锁,默认下是非公平锁。Semaphore比较简单,它只认信号量剩余量,只要没有超出都可以获取锁,所以也就是同一时间同一个共享资源可以多线程共享的(信号量大于1的情况下)。
读写锁ReadWriteLock相对复杂一些,因为分为了读锁和写锁,但我们只要记住读锁是共享模式、写锁是独占模式、对于同一把锁的CLH队列不同节点既可以是共享模式也可以是独占模式,这样读写锁也就非常好理解了。
再回到获取共享锁最开始的方法上:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
如果tryAcquireShared(arg) < 0,就会执行doAcquireShared(arg),这个方法主要逻辑是把线程包装为一个共享模式的节点Node放到同步队列的末端直至获取到锁。但是和独占模式不同的是,doAcquireShared如果获取到锁会马上通知后继节点去获取锁。
公平锁&非公平锁
公平与非公平一般由具体锁实现类实现,AQS本身没有这个概念。就拿ReentranLock来说,它内部的公平锁与非公平锁的区别在于获取锁是否严格遵循排队顺序:
- 如果锁被其他线程持有,那么再申请锁的其他线程会被挂起等待,加入到等待队列的末端,并遵循先入先出原则排队获取锁,这就是公平锁。
- 非公平锁则是让当前正在请求的线程优先插队第一个获取锁(不管等待队列是否有其他线程等待获取锁),如果获取到了直接返回,如果获取不到才加入到等待队列的末端。
我们看下ReentranLock独占锁源码便知,公平与非公平下获取锁的不同:
结语
我是tin,一个在努力让自己变得更优秀的普通工程师。自己阅历有限、学识浅薄,如有发现文章不妥之处,非常欢迎加我提出,我一定细心推敲并加以修改。
坚持创作不容易,你的正反馈是我坚持输出的最强大动力,谢谢!
文章首发于公众号【看点代码再上班】