锁介绍
锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源。
Lock接口
在Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要 显式 地获取和释放锁。 虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的 可操作性、可中断的获取锁 以及 超时获取锁 等多种synchronized关键字所不具备的同步特性。
使用synchronized关键字将会 隐式 地获取锁,但是它将锁的获取和释放固化了,也就是先获取再释放。
Lock lock = new ReentrantLock(); lock.lock(); try { } finally { lock.unlock(); }
注意 :
- 在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。
- 不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。
Lock接口提供的synchronized关键字所不具备的主要特性:
- 尝试非阻塞性获取锁: 当前线程尝试获取锁,如果此时没有其他线程占用此锁,则成功获取到锁。
- 能被中断的获取锁: 当获取到锁的线程被中断时,中断异常会抛出并且会释放锁。
- 超时获取锁: 在指定时间内获取锁,如果超过时间还没获取,则返回。
Lock 相关的API:
- void lock();:获取锁,获取之后返回
- void lockInterruptibly() throws InterruptedException;:可中断的获取锁
- boolean tryLock();:尝试非阻塞的获取锁
- boolean tryLock(long time, TimeUnit unit) throws InterruptedException;: 超时获取锁。 超时时间结束,未获得锁,返回false.
- void unlock();:释放锁
- Condition newCondition();:获取等待通知组件,改组件和锁绑定,当前线程获取到锁才能调用wait()方法,调用之后则会释放锁。
队列同步器
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int 成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者Doug Lea期望它能够成为实现大部分同步需求的基础。
同步器的主要使用方式是继承AbstractQueuedSynchronizer,通过同步器提供的3个方法getState()、setState(int newState)和compareAndSetState(int expect,int update)来进行线程安全的状态同步。
同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。 可以这样理解二者之间的关系:
- 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;
- 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的 方法
队列同步器的接口与示例:AbstractQueuedSynchronizer可重写的方法
- boolean tryAcquire(int arg):独占式获取同步状态。
- boolean tryRelease(int arg):独占式释放同步状态。
- int tryAcquireShared(int arg):共享式获取同步状态。
- boolean tryReleaseShared(int arg):共享释放取同步状态。
- boolean isHeldExclusively():当前同步器是否在独占式模式下被线程占用。
实现自定义同步组件时,将会调用同步器提供 独占式获取与释放同步状态、共享式获取与释放同步状态 和 查询同步队列中的等待线程情况 三类模板方法。
独占锁的示例代码:
public class TestLock implements Lock { private TestQueuedSync sync; /** * 获取锁 */ @Override public void lock() { sync.acquire(1); } /** * 可中断的获取锁 */ @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 尝试非阻塞式获取锁 */ @Override public boolean tryLock() { return sync.tryAcquire(1); } /** * 尝试非阻塞式获取锁 */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquire(1); } /** * 释放锁 */ @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } /** * 是否有同步队列线程 */ public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 锁是否被占用 */ public boolean isLock() { return sync.isHeldExclusively(); } private static class TestQueuedSync extends AbstractQueuedSynchronizer { /** * 独占式获取同步状态 */ @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 独占式释放同步状态 */ @Override protected boolean tryRelease(int arg) { if (getState() == 0) { throw new IllegalStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } /** * 同步状态是否被占用 */ @Override protected boolean isHeldExclusively() { return getState() == 1; } /** * 返回一个Condition,每个condition都包含了一个condition队列 */ Condition newCondition() { return new ConditionObject(); } } }
上述示例代码中,独占锁TestLock是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。TestLock中定义了一个静态内部类TestQueuedSync继承了同步器,在tryAcquire(int acquires)方法中,如果经过compareAndSetState设置成功,则代表获取了同步状态1,而在tryRelease(int releases)方法中只是将同步状态重置为0。
用户使用TestLock时并不会直接和内部同步器的实现TestQueuedSync打交道,而是调用TestLock提供的方法,在TestLock的实现中,以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。
队列同步器的实现分析
接下来将从实现角度分析同步器是如何完成线程同步的:
同步队列 : 一个FIFO双向队列。 当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点Node并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。 Node 保存 获取同步状态失败的线程引用、等待状态 以及 前驱和后继节点,节点的属性类型 与 名称 以及 描述 如下:
来看下存放的Node
/** * 等待状态: * CANCELLED : 1 在同步队列中等待超时或被中断,需要从队列中取消等待,在该状态将不会变化 * SIGNAL : -1 后继节点地线程处于等待状态,当前节点释放获取取消同步状态,后继节点地线程即开始运行 * CONDITION : -2 在等待队列中, * PROPAGATE : -3 下一次共享式同步状态获取将会无条件地被传播下去 * INITAL : 0 初始状态 */ volatile int waitStatus; volatile Node prev;//前驱节点 volatile Node next;//后继节点 volatile Thread thread;//获取同步状态的线程 Node nextWaiter;//等待队列中的后继节点。 如果节点是共享的的,这个字段将是一个SHARED常量
如上图所示,同步器包含了两个节点类型的引用,一个指向 头节点,而另一个指向 尾节点。 试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。acquire(int arg)代码如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述代码主要完成了 同步状态获取、节点构造、加入同步队列 以及 在同步队列中自旋等待。
- 首先调用自定义同步器实现的tryAcquire(int arg)方法保证线程安全的获取同步状态 如果获取同步状态失败,构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部。
- 最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态 如果获取不到则阻塞节点中的线程,而 被阻塞线程的唤醒 主要依靠 前驱节点的出队或 阻塞线程被中断 来实现。
我们来看下节点的构造以及加入同步队列的addWaiter(Node mode)和initializeSyncQueue()方法:
private Node addWaiter(Node mode) { Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null) { U.putObject(node, Node.PREV, oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } } private final void initializeSyncQueue() { Node h; if (U.compareAndSwapObject(this, HEAD, null, (h = new Node()))) tail = h; }
上述代码通过在“死循环”中使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。 如果没有尾节点的话,则构建一个新的同步队列。
接下来看下acquireQueued(final Node node, int arg)方法:
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (Throwable t) { cancelAcquire(node); throw t; } }
在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?原因有两个:
- 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
- 维护同步队列的FIFO原则。
自定义同步组件
- 在同一时刻,只允许至多两个线程同时访问,超过两个线程的访问将被阻塞。
- 能够在同一时刻支持多个线程的访问(共享式访问)。
package com.atguigu.ct.producer.Test.BB; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class CustomLock implements Lock { private CustomSyncQueue customSyncQueue = new CustomSyncQueue(2); public static void main(String[] args) { final Lock lock = new CustomLock(); class Worker extends Thread { @Override public void run() { while (true) { lock.lock(); try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(1); } catch (Exception e ){ } finally { lock.unlock(); } } } } // 启动10个线程 for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.setDaemon(true); w.start(); } // 每隔1秒换行 for (int i = 0; i < 10; i++) { System.out.println(); } } @Override public void lock() { customSyncQueue.tryAcquireShared(1); } @Override public void unlock() { customSyncQueue.tryReleaseShared(1); } public static class CustomSyncQueue extends AbstractQueuedSynchronizer { public CustomSyncQueue(int count) { if (count <= 0) { throw new IllegalStateException("count must >= 0"); } setState(count); } @Override protected int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override protected boolean tryReleaseShared(int returnCount) { for (; ; ) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
在上述示例中,CustomLock实现了Lock接口,提供了面向使用者的接口,使用者调用lock() 方法获取锁,随后调用unlock()方法释放锁,而同一时刻只能有两个线程同时获取到锁。 TwinsLock同时包含了一个自定义同步器Sync,而该同步器面向线程访问和同步状态控制。以 共享式获取同步状态为例:同步器会先计算出获取后的同步状态,然后通过CAS确保状态的正 确设置,当tryAcquireShared(int reduceCount)方法返回值大于等于0时,当前线程才获取同步状 态,对于上层的TwinsLock而言,则表示当前线程获得了锁。
同步器作为一个桥梁,连接线程访问以及同步状态控制等底层技术与不同并发组件(比如 Lock、CountDownLatch等)的接口语义
重入锁
重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对 资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。 我们回顾下TestLock的lock方法,在 tryAcquire(int acquires)方法时没有考虑占有锁的线程再次获取锁的场景,而在调用tryAcquire(int acquires)方法时返回了false,导致该线程被阻塞。
在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。 事实上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。
下面我们来分析下ReentrantLock 的实现:
- 实现重进入
- 重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题:
- 线程再次获取锁
- 锁的最终释放
下面是ReentrantLock通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
此方法通过判断 当前线程是否为获取锁的线程 来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
公平与非公平获取锁的区别
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是 FIFO。
读写锁
之前提到锁(如TestLock和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。 读写锁维护了一对锁,一个 读锁 和一个 写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
一般情况下,读写锁 的性能都会比 排它锁 好,因为大多数场景 读是多于写 的。在读多于写的情况下,读写锁 能够提供比 排它锁 更好的 并发性 和 吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock
- 公平性选择 :支持公平和非公平的方式获取锁,吞吐量非公平优于公平。
- 重进入 : 读锁在获取锁之后再获取读锁,写锁在获取锁之后再获取读锁和写锁。
- 锁降级 :遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级为读锁。
读写锁的接口与示例 ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法,而其实现类ReentrantReadWriteLock,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,这些方法如下:
- getReadLockCount():返回当前读锁获取的次数
- getReadHoldCount():返回当前线程获取读锁的次数
- isWriteLocked():判断写锁是否被获取
- getWriteHoldCount():返回当前写锁被获取的次数
通过读写锁保证 非线程安全的HashMap的读写是线程安全的。
static Map<String, Object> map = new HashMap<>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); /** * 获取一个key对应的value */ public static final Object get(String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } /** * 设置key对应的value,并返回旧的value */ public static final Object put(String key, Object value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } /** * 清空所有的内容 */ public static final void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } }
锁降级
锁降级指的是 写锁降级成为读锁。 如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
//当数据发生变更后,update变量(布尔类型且volatile修饰)被设置为false public void processData() { readLock.lock(); if (!update) { // 必须先释放读锁 readLock.unlock(); // 锁降级从写锁获取到开始 writeLock.lock(); try { if (!update) { // 准备数据的流程(略) update = true; } readLock.lock(); } finally { writeLock.unlock(); } // 锁降级完成,写锁降级为读锁 } try { // 使用数据的流程(略) } finally { readLock.unlock(); } }
上述示例中,当数据发生变更后,布尔类型且volatile修饰update变量被设置为false,此时所有访问processData()方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。
锁降级中读锁的获取是否必要呢?
答案是必要的。主要是为了 保证数据的可见性。 如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么 当前线程无法感知线程T的数据更新。 如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
RentrantReadWriteLock不支持锁升级。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
LockSupport工具
当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。 LockSupport定义了一组的公共静态方法,这些方法提供了最基本的 线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。
LockSupport提供的 阻塞和唤醒的方法 如下:
- park():阻塞当前线程,只有调用 unpark(Thread thread)或者被中断之后才能从park()返回。
- parkNanos(long nanos):再park()的基础上增加了超时返回。
- parkUntil(long deadline):阻塞线程知道 deadline 对应的时间点。
- park(Object blocker):Java 6时增加,blocker为当前线程在等待的对象。
- parkNanos(Object blocker, long nanos):Java 6时增加,blocker为当前线程在等待的对象。
- parkUntil(Object blocker, long deadline):Java 6时增加,blocker为当前线程在等待的对象。
- unpark(Thread thread):唤醒处于阻塞状态的线程 thread。 有对象参数的阻塞方法在线程dump时,会有更多的现场信息
Condition接口
任意一个Java对象,都拥有一组监视器方法,定义在java.lang.Object),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。
Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现 等待/通知 模式,但是这两者在使用方式以及功能特性上还是有差别的。
以下是Object的监视器方法与Condition接口的对比:
Condition的使用方式比较简单,需要注意在调用方法前获取锁,如下:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException { lock.lock(); try { condition.await(); } finally { lock.unlock(); } } public void conditionSignal() throws InterruptedException { lock.lock(); try { condition.signal(); } finally { lock.unlock(); } }
Condition 接口方法介绍:
- void await() throws InterruptedException : 当前线程进入等待状态直到被通知或中断
- void awaitUninterruptibly() :当前线程进入等待状态直到被通知,对中断不敏感
- long awaitNanos(long var1) throws InterruptedException :当前线程进入等待状态直到被通知、中断或超时
- boolean await(long var1, TimeUnit var3) throws InterruptedException :当前线程进入等待状态直到被通知、中断或超时
- boolean awaitUntil(Date var1) throws InterruptedException :当前线程进入等待状态直到被通知、中断或到某一时间
- void signal() :唤醒Condition上一个在等待的线程
- void signalAll() :唤醒Condition上全部在等待的线程
获取一个Condition必须通过Lock的newCondition()方法。
Condition的实现分析
主要包括 等待队列、等待和通知。
- 等待队列
- 等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。
- 线程调用Condition.await(),即以当前线程构造节点,并加入等待队列的尾部。
锁的知识点
- Lock接口提供的方法lock()、unlock()等获取和释放锁的介绍
- 队列同步器的使用 以及 自定义队列同步器
- 重入锁 的使用和实现介绍
- 读写锁 的 读锁 和 写锁
- LockSupport工具实现 阻塞和唤醒线程
- Condition接口实现 等待/通知模式
结尾
本章介绍了Java并发包中与锁相关的API和组件,通过示例讲述了这些API和组件的使用 方式以及需要注意的地方,并在此基础上详细地剖析了队列同步器、重入锁、读写锁以及 Condition等API和组件的实现细节,只有理解这些API和组件的实现细节才能够更加准确地运 用它们