java.util.concurrent解析——AbstractQueuedSynchronizer队列管理

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 上一篇博客中,我们提到`AQS`的队列管理是基于CLH锁队列实现的,所以首先我们来看下`CLH锁队列`。

上一篇博客中,我们提到AQS的队列管理是基于CLH锁队列实现的,所以首先我们来看下CLH锁队列

1 CLH锁队列

CLH锁队列本质上是一个基于链表的FIFO自旋锁队列,队列中的每一个节点实质上是一个自旋锁:在阻塞时不断循环读取状态变量,当前驱节点释放同步对象使用权后,跳出循环,执行同步代码。其基本结构如下:
这里写图片描述

队列中每一个节点有两个成员:

  • 节点状态变量
  • 前驱指针:pred

head,tail并不是实际节点,只是为了表示队列的首尾,被称为dumb node。

在如此结构之下,其enqueue操作逻辑如下:

do { pred = tail;
} while(!tail.compareAndSet(pred, node));

其lock操作如下:

public void lock() {
  final Node node = new Node();
  node.locked = true;
  // 一个CAS操作即可将当前线程对应的节点加入到队列中,
  // 并且同时获得了前继节点的引用,然后就是等待前继释放锁
  Node pred = this.tail.getAndSet(node);
  this.prev.set(pred);
  while (pred.locked) {// 进入自旋
  }
}

可以看到其自旋逻辑。

而其dequeue操做更加简单:

head = node;

从面的操作,可以看到CLH锁队列有如下优势:

  • 队列的入列、出列操作原子性完成,无需加锁,高效
  • 判断当前队列等待是否为空同样简单,只需检查head是否为tail即可
  • 每个节点独立维护其状态变量,避免了集中状态管理的内存竞争

2 AQS进程队列

AQS进程队列相比于CLH锁队列主要做了两处修改:

  • 每个节点新增一个next指针。由于AQS队列中的进程不仅有自旋等待,还包括阻塞等待的情况。阻塞等待的队列需要其他队列主动唤醒。这就要求队列中某个节点出列时需要显式告知其后继节点,因而需要加入next指针
  • 节点状态变量status由一个bit替换成一个int。这主要是由于AQS下的状态更加复杂

首先来看下AQS队列节点的基本结构:

static final class Node {
     // 表明节点是否以共享模式等待的标记
    static final Node SHARED = new Node();

    // 表明节点是否以独占模式等待的标记
    static final Node EXCLUSIVE = null;

    // 表明线程已被取消
    static final int CANCELLED =  1;

    // 表明后续节点的线程需要unparking
    static final int SIGNAL    = -1;

    // 表明线程正在等待一个条件
    static final int CONDITION = -2;

    // 表明下一次acquireShared应该无条件传播
    static final int PROPAGATE = -3;

    /*
     * 状态字段,只能取下面的值:
     * SIGNAL(-1):    这个结点的后继是(或很快是)阻塞的(通过park),所以当前结点
     *              必须unpark它的后继,当它释放或取消时。为了避免竞争,acquire方法必须
     *              首先表明它们需要一个信号,然后再次尝试原子性acquire,如果失败了就阻塞。
     *               
     * CANCELLED(1):  这个结点由于超时或中断已被取消。结点从不离开这种状态。尤其是,
     *                 这种状态的线程从不再次阻塞。
     *
     * CONDITION(-2): 这个结点当前在一个条件队列上。它将不会用于sync队列的结点,
     *               直到被转移,在那时,结点的状态将被设为0.
     *              这个值在这里的使用与其他字段的使用没有关系,仅仅是简化结构。
     *               
     * PROPAGATE(-3): releaseShared应该传递给其他结点。这是在doReleaseShared里设置
     *                 (仅仅是头结点)以确保传递继续,即使其他操作有干涉。
     *
     * 0:             非以上任何值。
     *
     * 值是组织为数字的用以简化使用。非负值表示结点不需要信号。这样,大部分代码不需要
     * 检查特定的值,只需要(检查)符号。
     *
     * 对于普通同步结点,字段初始化为0;对于条件结点初始化为CONDITION(-2)。
     * 通过CAS操作修改(或者,当允许时,用无条件volatile写。)
     */
    volatile int waitStatus;

    /*
     * 连接到当前结点/线程依赖的用来检查等待状态的前驱结点。
     * 在进入队列时赋值,只在出队列时置为空(为了GC考虑)。
     * 根据前驱结点的取消,我们使查找一个非取消结点的while循环短路,这个总是会退出,
     * 因为头结点从不会是取消了的:一个结点成为头只能是一次成功的acquire操作结果。
     *
     * 一个取消了的线程从不会在获取操作成功,线程只能取消自己,不能是其他结点。
     */
    volatile Node prev;

    /*
     * 连接到当前结点/线程释放时解除阻塞的后续结点。
     * 在入队列时赋值,在绕过已取消前驱节点时调整,出队列时置为空(for GC)。
     * 入队操作不会给前驱结点的next字段赋值,直到附件后(把新节点赋值给队列的tail属性?),
     * 所以看到next字段为空不一定表示它就是队列的尾结点。然而,如果next字段看起来是空,
     * 我们可以从tail向前遍历进行双重检查。
     * 被取消了的结点的next字段被设置为指向它自己而不是空,这让isOnSyncQueue变得容易。
     */
    volatile Node next;

    /*
     * 列队在这个结点的线程,在构造时初始化,用完后置空。
     */
    volatile Thread thread;

    /*
     * 连接到下一个在条件上等待的结点或是特殊的值SHARED。
     * 因为条件队列只在独占模式下持有时访问,我们只需要一个简单的链表队列来持有在条件上等待的结点。
     * 他们然后被转移到队列去re-acquire。
     * 因为条件只能是独占的,我们通过用一个特殊的值来表明共享模式 来节省一个字段。
     */
    Node nextWaiter;

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

接下来我们就来看下其主要操作的主要逻辑。

3 enqueue

由于AQS队列节点包括pred和next两个指针,无法通过一次原子操作更新两个指针。所以添加结点到队列的操作最重要的是要保证:即使添加的CAS操作失败了,也不能影响队列结点现有的连接关系。

对于新加结点:

  • 在CAS之前指向它的预期前驱
  • CAS成功之后再更新预期前驱的后继指针。

在步骤1成功之后、步骤2完成之前,其他线程通过结点的 “next” 连接可能看到“尾结点”(即代码里的 pred)的 “next” 为空,但其实队列里已经加入新的结点,这也是为什么通过 “next” 连接遍历队列时碰到后继为空的,必须从原子地更新的 “tail” 结点向后遍历。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
   // 尝试enq的快速路径;失败后回退到完整的enq。
    Node pred = tail;
    if (pred != null) {
      // 把新结点的前驱指向pred,必须在下面的CAS完成之前设置,
      // 这样确保一旦CAS成功后,从tail向后遍历是ok的。
        node.prev = pred;// 步骤 1
        if (compareAndSetTail(pred, node)) {  //CAS
             // 新节点成功进入队列
             // 前驱结点的next字段指向新结点,建立完整的连接。
            pred.next = node; // 步骤 2
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列是空,必须初始化。
            if (compareAndSetHead(new Node())) // 原子地设置头结点
                tail = head; // 尾结点也指向头结点
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {  // 步骤 1
                t.next = node; // 步骤 2
 // 在把新结点设置为tail后才能更新前驱的next字段,这样,即使CAS失败了也不会影响原来的连接关系。
                return t;
            }
        }
    }
}

4 acquire

acquire方法不提供绝对公平的保证,因为现在在加入队列之前先进行tryAcquire操作,如果这个线程先于头结点锁定,那么头结点就只能继续等待了。这种情形称为闯入。

这个acquire之所以先尝试获取是为了在无竞争的情况下提高性能,并可以实现非公平的获取。如果要保证绝对的公平性,则可以在子类实现的tryAcquire方法里判断当前线程是否是头结点,是则尝试获取,不是则直接返回false。

// 以独占模式获取
public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 首先尝试获取
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      // 失败后加入等待队列,再从队列里再次尝试获取;成功获取后才返回,
      // 返回的boolean表示线程是否曾经被中断。

      // 在acquireQueued方法里,线程可能被反复park、unpark,直到获取锁。
      selfInterrupt(); // 重新设置中断状态位,是否执行取决于acquireQueued的返回值
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; // 线程是否曾被中断是由这个变量记录的。
        for (;;) { // 死循环,用于acquire失败后重试
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {// 前驱是头结点,继续尝试获取
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 检测是否需要等待,如果需要,则park当前线程
            // 只有前驱在等待时才进入等待,否则继续重试
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 线程进入等待,需要其他线程来唤醒这个线程以继续执行
                interrupted = true;   // 只要线程在等待过程中被中断过一次就会记录下来
        }
    } finally {
        if (failed)
             // acquire失败,取消acquire
            cancelAcquire(node);
    }
}

/*
 * 这个方法是信号控制的核心。检查和更新没有成功获取的结点的状态。
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
      // 前驱结点也在等待,说明这是一个稳定的等待状态。
        return true ;
    if (ws > 0) {
      // 前驱结点已取消,向前遍历直到找到一个非取消结点。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 把找到的结点的后继指向node,那么当前pred与node之间的已取消结点就不再被引用了,可以被垃圾回收。
        pred.next = node;
    } else {
      // 前驱的状态必是 0 或 PROPAGATE之一。表明需要一个信号,但不park先。
      // 调用者需要重试来确保它在park之前没法获取。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
       // park当前执行线程, 其他线程unpark这个线程后继续执行
    LockSupport.park( this);
    return Thread.interrupted();
}

5 release

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

private void unparkSuccessor(Node node) {
    /*
     * 如果status是负的(比如,可能需要信号)尝试清除预期的信号。
     * 如果这失败了或status被其他等待线程修改也是没关系的。
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 准备unpark的线程在后继里持有,一般就是下一个结点。
     * 但如果被取消或是空,从tail向后遍历来找到实际的非取消后继。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
      // 没有直接后继或直接后继不需要通知
        s = null;

        // 从tail向后遍历,查找需要通知的结点
        for (Node t = tail; t != null && t != node; t = t.prev)
             // 找到一个后不跳出循环是为了找到最老的需要通知的结点。
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 结点不为null,唤醒后继的等待线程
        LockSupport.unpark(s.thread);
}
相关文章
|
11天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
39 2
|
14天前
|
Java
轻松上手Java字节码编辑:IDEA插件VisualClassBytes全方位解析
本插件VisualClassBytes可修改class字节码,包括class信息、字段信息、内部类,常量池和方法等。
66 6
|
2天前
|
数据采集 存储 Web App开发
Java爬虫:深入解析商品详情的利器
在数字化时代,信息处理能力成为企业竞争的关键。本文探讨如何利用Java编写高效、准确的商品详情爬虫,涵盖爬虫技术概述、Java爬虫优势、开发步骤、法律法规遵守及数据处理分析等内容,助力电商领域市场趋势把握与决策支持。
|
6天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
7天前
|
Java 测试技术 API
Java 反射机制:深入解析与应用实践
《Java反射机制:深入解析与应用实践》全面解析Java反射API,探讨其内部运作原理、应用场景及最佳实践,帮助开发者掌握利用反射增强程序灵活性与可扩展性的技巧。
|
12天前
|
存储 算法 Java
Java Set深度解析:为何它能成为“无重复”的代名词?
Java的集合框架中,Set接口以其“无重复”特性著称。本文解析了Set的实现原理,包括HashSet和TreeSet的不同数据结构和算法,以及如何通过示例代码实现最佳实践。选择合适的Set实现类和正确实现自定义对象的hashCode()和equals()方法是关键。
23 4
|
15天前
|
Java 编译器 数据库连接
Java中的异常处理机制深度解析####
本文深入探讨了Java编程语言中异常处理机制的核心原理、类型及其最佳实践,旨在帮助开发者更好地理解和应用这一关键特性。通过实例分析,揭示了try-catch-finally结构的重要性,以及如何利用自定义异常提升代码的健壮性和可读性。文章还讨论了异常处理在大型项目中的最佳实践,为提高软件质量提供指导。 ####
|
18天前
|
存储 Java 开发者
Java中的集合框架深入解析
【10月更文挑战第32天】本文旨在为读者揭开Java集合框架的神秘面纱,通过深入浅出的方式介绍其内部结构与运作机制。我们将从集合框架的设计哲学出发,探讨其如何影响我们的编程实践,并配以代码示例,展示如何在真实场景中应用这些知识。无论你是Java新手还是资深开发者,这篇文章都将为你提供新的视角和实用技巧。
18 0
|
8天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
7天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
下一篇
无影云桌面