面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?

简介: 面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?

从队列容器选择,到算法一步步迭代,这是一篇从零开始逐步推演出“非阻塞线程安全队列”思想方法的文章。(有剧情反转~)


队列容器设计


若用数组作为队列的容器,就必须得加锁,因为数组是一块连续内存地址,多线程场景下,读写同一块内存地址不得不互斥地访问。


链式结构


链式结构就没有这个烦恼。链的每个结点都对应不同的内存地址,在多线程场景下,取头结点和插尾结点就不存在并发问题。(至少是降低了并发问题产生的概率)


通用的队列应该可存放任何类型的元素。


综上,就得声明一个带泛型的链结点:


// 结点
private static class Node<E> {
    E item; // 结点数据字段
    Node<E> next; // 结点后续指针
}


易变的字段


会不会存在多线程同时操纵同一个 Node 结点的 item 和 next 字段的情况?


当然有可能!为了让所有线程都能看到最新的 item 和 next,就得保证其 “可见性”


private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
}


volatile的字面意思是“易变的”,它用来形容共享变量高速缓存中的副本


image.png


为提高线程运行效率,每个线程在得到 CPU 运行时间片后,都会将主存中变量拷贝到 CPU 高速缓存中(图中 a1,a2 即是 a 的副本),之后对共享变量的运算都在缓存中进行。


若 a 初始值为0,启动线程1和2,a 就新增了两个副本 a1, a2。随后俩线程并发地对 a 执行自增。


线程1执行a1++后 a1 = 1,线程2对线程1的操作一无所知,当它也执行a2++后 a2 = 1。而共享变量正确的值应该是 2。


造成这个错误的原因在于:缓存是相互隔离的,导致线程无法感知对方对共享变量的操作,即 “当前线程对共享变量的操作对其他线程不可见。”


解决方案自然是让其变得 “可见”


java 的volatile关键词,用于告诉编译器该共享变量在缓存中的副本是“易变的”,线程不该相信它,于是乎:


  1. 线程写共享变量后,总是应该立刻将最新值同步到主存中。


  1. 线程读共享变量时,总是应该去主存拿最新值。


就这样通过读写主存,线程间建立了一种通信机制,让它们对共享变量的操作变得“可见”。


为了方便队头出队,队尾入队,得安排两个指针,分别指向一头一尾。


public class MyQueue<E> {
    volatile Node<E> head; // 当前链头
    volatile Node<E> tail; // 当前链尾
}


同样的道理,链头尾指针在多线程场景下是共享变量,也得使用 volatile 保持它们的可见性。


阶段性总结:


  • 每个线程都有自己独立的内存空间(本地内存)


  • 多线程能同时访问的变量称为“共享变量”,它在主存中。出于效率的考量,线程会将主存中的共享变量拷贝到本地内存,后续对共享变量的操作都发生在本地内存。


  • 线程本地内存相互独立,导致它们无法感知别人对共享变量的操作,即当前线程对共享变量的操作对其他线程不可见。


  • volatile 关键词是 java 解决共享变量可见性问题的方案。


  • 被声明为 volatile 的变量,就是在告诉编译器该共享变量在线程本地内存中的副本是“易变的”,线程不该相信它。线程写共享变量后,总是应该立刻将最新值同步到主存中。线程读共享变量时,总是应该去主存拿最新值。


  • 多线程场景下,队列的头尾指针,即结点的数据域和指针域都需要保证可见性。


出队


基础版


单链表取头结点的算法很简单,特别是已知头结点 head 时:


// 出队版本一
public E poll() {
    Node<E> p = head;
    E item  = p.item; // 暂存头结点数据
    head = head.next; // 更新头结点指针
    p.item = null; // 老头结点内容置空
    p.next = null; // 老头结点后续指针置空
    return item;
}


在单线程情况下没什么毛病,但多线程就会出问题。


非阻塞线程安全版


某线程正在读头结点的 item 字段,另一个线程可能已经将其置空。


当然可以使用synchronize将对 head 的操作包裹起来,但这样处理是“有锁的”、“阻塞的”,即同一时间只有一个线程能获取锁,其余竞争者都被挂起等待锁被释放,然后再唤醒。“挂起-唤醒”是耗时的。


性能更好的“非阻塞”方式是CAS,全称为compare and swap,即“比较再交换”。


当要更新变量值时,需要提供三个参数:1.当前值,2.期望值,3.新值。只有当前值和期望值相等时,才用新值替换当前值。


其中期望值就是线程被打断执行时暂存的值,当线程恢复执行后,用当前值和当时暂存值比较,如果相等,则表示被打断过程中其他线程没有修改过它,那此时更新它的值是安全的。


CAS 操作,被封装在sun.misc.Unsafe中,结合当前场景再简单封装如下:


private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
static <E> boolean casItem(Node<E> node, E cmp, E val) {
    // 将 node 结点 item 字段值通过和期望值 cmp 比较,安全地置为 val。
    return U.compareAndSwapObject(node, ITEM, cmp, val);
}


casItem()在多线程下将 node.item 通过和预期值cmp字段比较安全地赋予新值val

除此之外,还需要一个更新头结点的 CAS 操作:


// 头结点脱链并更新头指针
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p) { 
        Node<E> h1 = h; // 暂存当前头指针
        casHead(h, p); // 更新头指针到 p 的 CAS 竞争
        h1.next = null; // 老头指针 next 置空
    }
}
private boolean casHead(Node<E> cmp, Node<E> val) {
    // 将头指针 head 通过和预期值 cmp 比较安全地赋予新值 val
    return U.compareAndSwapObject(this, HEAD, cmp, val);
}


CAS 通常需配合轮询使用,因为多线程同时修改一个共享变量时,只有一个线程会成功(返回 true),其余失败(返回 false),失败线程通过轮询再次尝试 CAS,直到成功为止。


用这个思路,写一个线程安全的取链头结点算法:


// 出队版本二
public E poll() {
    // 轮询
    for (;;) {
        E item = head.item;// 暂存 item
        Node<E> next = head.next; // 暂存次头结点
        // 将头结点 item 字段置空的 CAS 竞争
        if (casItem(head, item, null)) {
            for(;;) {
                // 将头结点指向次头结点的 CAS 竞争
                if (updateHead(head, next)) return item;
            }
        }
    }
}


死循环修正版


这下修改头结点的 item 字段算是线程安全了,但这个算法还差点东西。若某线程在竞争中失败,另一个线程成功地将头结点 item 字段置空了,因为轮询的存在,失败线程会不停重试,但此时 head.item 的当前值已经变为 null 而不是暂存的 item。所以 CAS 轮询会一直失败,程序陷入死循环。


为了解决死循环,修改逻辑如下:


// 出队版本三
public E poll() {
    // 轮询
    for (Node<E> h = head, p = h, q;;) {
        E item = p.item;// 暂存 item
        // 若头结点 item 为空,则不进行 CAS 竞争
        if (item != null && casItem(p, item, null)) {
            for(;;) {
                // 出队结点为 p,p 后续结点是新的头结点
                // 但若 p 是尾结点,则头指针还是停留在 p
                Node<E> newHead = ((q = p.next) != null) ? q : p;
                // 更新头结点的 CAS 竞争
                if (updateHead(head, newHead)) return item;
            }
        } 
        // q 是 p 后续结点
        else if ((q = p.next) == null) { 
            // 链被掏空,更新头结点指针为最后一个非空结点,返回 null
            updateHead(h, p);
            return null;
        } 
        // 还有后续结点
        else {
            p = q; // 工作结点 p 向后寻找第一个 item 非空的结点
        }
    }
}


新增了三个工作指针:


  1. p:表示第一个 item 字段不空的结点。


  1. q:p 的后续结点,用于向后遍历。


  1. h:暂存进入循环时的链头指针 head。


当某线程在将 item 置空的竞争中失败时,另一个线程成功地将头结点 item 置空。因为轮询的存在,失败线程继续尝试,当下一轮 for 循环执行到条件item != null时会停止竞争,因为当前 p 指向结点的 item 字段已经被其他线程置空。此时 p 借助于 q 的帮助往后遍历寻找第一个 item 不空的结点。若找到则再次发起竞争,若遍历到链尾仍未找到,则表示链已经被掏空,直接返回 null。


ps:空链的表达方式不是 head = null,而是 head 指向一个结点,head.next = null,head.item = null,如下图所示:


image.png


(之所以叫空链版本一,是因为这不是最终形态,这样设计会有问题,详解见下一节)


巧妙脱链哨兵版


offer() 是非阻塞的,它在执行任何一条语句都有被打断的可能,这就不太妙了。


考虑下面这个场景:“线程A正执行出队,刚进入 for 循环,即刚把当前头指针暂存在 h,p 中,此时它被中断了,另一个出队线程得到执行机会,并抢先把头结点出队了。终于又轮到线程A继续执行。。。”


image.png


上图左侧为线程A正准备执行出队操作的初始状态,但它被中断了,另一个线程抢先把头结点 Node1 出队脱链了,并将 next 置空。当线程A恢复执行时,老母鸡变鸭的场景如上图右侧所示。


按照版本三的算法,之前暂存的头结点 p 的 item 字段已被置空,所以会往后遍历以定位到第一个 item 非空的结点。但尴尬的事情来了,因为在将头结点脱链时会将其 next 也置空。当 p 指针往后移动一个结点后就发现是空指针,版本三的算法判定这种情况即是链表被掏空,所以返回了 null。但明明链上还有一个 Node2 结点。。。


所以 “脱链结点”“链尾结点” 的判定得区分开来。修改一下脱链方法:


final void updateHead(Node<E> h, Node<E> p) {
    // 若修改头结点 CAS 竞争成功,则将头结点自旋(next域指向自己)
    if (h != p && casHead(h, p))
        lazySetNext(h, h);
}
// 将 head.next 指向自己
static <E> void lazySetNext(Node<E> node, Node<E> val) {
    U.putOrderedObject(node, NEXT, val);
}


头结点脱链操作分为两步:


  1. 将头指针安全地指向新结点。


  1. 将头结点自旋,即 next 指针指向自己。 这两步操作的效果如下图所示:


image.png


脱链结点 和 链尾结点 的判断条件区分开之后,脱链结点就有 “哨兵” 的效果:


  • 脱链结点的判断条件为p == p.next


  • 链尾结点的判断条件为p.next == null相应的修改出队算法:


// 出队版本四
public E poll() {
    restartFromHead:
    // 第一层轮询
    for(;;) {
        // 第二层轮询
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // 找到真正的非空链头
            if (item != null && casItem(p, item, null)) {
                for(;;) {
                    Node<E> newHead = ((q = p.next) != null) ? q : p;
                    if (updateHead(head, newHead)) return item;
                }
            } 
            // 空链
            else if ((q = p.next) == null) { 
                updateHead(h, p);
                return null;
            } 
            // 哨兵:p 指向结点已经脱链,从新 head 开始重新遍历
            else if (p == q)
                continue restartFromHead;
            // 当还有后续结点
            else {
                p = q; 
            }
        }
    }
}


新增了一个逻辑分支else if (p == q)用于表示当前遍历结点已脱链,脱链即表示头结点 head 已被更新到新位置。所以“从头遍历以寻找第一个 item 非空的结点”这项工作应该从新的 head 出发,即 p 应该被新 head 赋值,为了实现该效果,就不得不用上 goto 这个骚操作,在 java 中用 continue 实现。


性能优化最终版


版本四已经可以正确地在多线程场景下实现出队,但还有一个地方可以被优化。


虽然 CAS 是非阻塞的竞争,但竞争就要耗费 CPU 资源。所以能不竞争就不竞争,在当前算法中“更新链头指针”就是可以缓一缓的竞争。


// 出队版本五
public E poll() {
    restartFromHead:
    for(;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // 找到真正的非空头结点
            if (item != null && casItem(p, item, null)) {
                // 有条件地更新头指针
                if(p!=h) {
                    Node<E> newHead = ((q = p.next) != null) ? q : p;
                    if (updateHead(head, newHead)) return item;
                }
                return item;
            } 
            else if ((q = p.next) == null) { 
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else {
                p = q; 
            }
        }
    }
}


为更新头指针增加了条件p!=h,即只有当 head 指针滞后时才更新它。


因为 head 滞后必然造成 p 和 h 指向不同结点。p 的使命是从 head 出发往后定位到第一个 item 不空的结点。若 head 没有滞后,则 p 无需往后遍历,此时 p 必然等于 h。


不及时更新链头指针会导致算法出错吗?不会。因为该算法其实并不相信 head,所以安排了工作指针 p 通过遍历来找到真正的头结点。但不及时更新让竞争的次数直接减半,提高了性能。


阶段性总结:


非阻塞线程安全队列的出队算法总结如下:


  • 总是从头指针出发向后寻找真正的头结点(item 非空),若找到,则将头结点 item 域置 null(CAS保证线程安全),表示结点出队,若未找到则返回 null。


  • 采用 “自旋” 的方式来实现脱链(结点指针域与链脱钩),即 next 域指向自己(CAS保证线程安全)。自旋形成了哨兵的效果,使得往后遍历寻找真正头结点的过程中可感知到结点脱链。


  • 滞后的头指针:“出队”、“脱链”、“更新头指针”不是配对的。理论上每次出队都应该脱链并更新头指针,为了提高性能,两次出队对应一次脱链+更新头指针,造成头结点会间歇性地滞后。


  • 因为是非阻塞的,出队操作可能被其他线程打断,若是被入队打断则基本无害,因为队头队尾是两块不同的内存地址,不会有线程安全问题。若是被另一个出队线程打断,就可能发生想出的结点被其他线程抢先出队的情况。通过检测自旋+从头开始来定位到新得头结点解决。


入队


用同样的思路,一步步徒手写一个入队算法。


基础版


入队在链表结构上,即是链尾插入,这个算法很简单,特别是在已知尾指针tail的情况下:


// 入队版本一
public boolean offer(E e) {
    Node<E> newNode = createNode(e)// 构建新结点
    tail.next = newNode;// 链尾链入新结点
    tail = newNode;// 更新链尾指针
    return true;
}


非阻塞线程安全版


但在多线程情况下就变得有点复杂,比如在向尾结点插入新结点的同时,另一个线程正在删除这个尾结点。


尾结点tail这个变量是多线程共享的,对它的操作需要考虑线程安全问题。和出队操作一样使用 CAS。


对于链尾插入的场景来说,尾结点 next 指针的期望值是null,若不是 null 则表示尾结点被另一个线程修改了,比如被另一个写线程抢先一步在尾部插入了一个新结点。尾结点 next 指针的新值自然是 newNode。


CAS 的封装如下:


// java.util.concurrent.ConcurrentLinkedQueue
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) {
    return U.compareAndSwapObject(node, NEXT, cmp, val);
}


castNext()即是 CAS 机制的一个实现,它在多线程下将 Node 结点的 next 指针安全地赋予新值。


除此之外,还需要一个casTail()


private boolean casTail(Node<E> cmp, Node<E> val) {
    return U.compareAndSwapObject(this, TAIL, cmp, val);
}


它在多线程下将链尾指针安全地赋予新值。


用这个思路,写一个线程安全的入队算法:


// 入队版本二
public boolean offer(E e) {
    Node<E> newNode = createNode(e)
    // 轮询
    for (;;) {
        // 尾插入 CAS 竞争
        if (casNext(tail, null, newNode)) { 
            // 若竞争成功,则更新尾指针 CAS 竞争
            for(;;){
                if (casTail(tail, newNode)) return true;
            }
        }
    }
}


目录
相关文章
|
1月前
|
存储 缓存 算法
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
本文介绍了多线程环境下的几个关键概念,包括时间片、超线程、上下文切换及其影响因素,以及线程调度的两种方式——抢占式调度和协同式调度。文章还讨论了减少上下文切换次数以提高多线程程序效率的方法,如无锁并发编程、使用CAS算法等,并提出了合理的线程数量配置策略,以平衡CPU利用率和线程切换开销。
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
|
1月前
|
开发框架 Java .NET
.net core 非阻塞的异步编程 及 线程调度过程
【11月更文挑战第12天】本文介绍了.NET Core中的非阻塞异步编程,包括其基本概念、实现方式及应用示例。通过`async`和`await`关键字,程序可在等待I/O操作时保持线程不被阻塞,提高性能。文章还详细说明了异步方法的基础示例、线程调度过程、延续任务机制、同步上下文的作用以及如何使用`Task.WhenAll`和`Task.WhenAny`处理多个异步任务的并发执行。
|
1月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
47 7
|
1月前
|
消息中间件 存储 安全
|
3月前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
3月前
|
消息中间件 前端开发 NoSQL
面试官:线程池遇到未处理的异常会崩溃吗?
面试官:线程池遇到未处理的异常会崩溃吗?
83 3
面试官:线程池遇到未处理的异常会崩溃吗?
|
2月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
41 1
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
37 1
|
3月前
|
消息中间件 存储 前端开发
面试官:说说停止线程池的执行流程?
面试官:说说停止线程池的执行流程?
56 2
面试官:说说停止线程池的执行流程?
|
3月前
|
消息中间件 前端开发 NoSQL
面试官:如何实现线程池任务编排?
面试官:如何实现线程池任务编排?
43 1
面试官:如何实现线程池任务编排?