非阻塞同步算法实战(一)

简介:

感谢trytocache投递本文。

前言

本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试、所走的弯路,经验和教训。

背景介绍

上个月,我被安排独自负责一个聊天系统的服务端,因为一些原因,我没使用现成的开源框架,网络那块直接使用AIO,收数据时,因为只会从channel里过来,所以不需要考虑同步问题;但是发送数据时,因为有聊天消息的转发,所以必需处理这个同步问题。AIO中,是处理完一个注册的操作后,再执行我们定义的方法,此时,如果还有数据需要写,则继续注册写操作,如果没有则不注册;提交数据时,如果当前没有注册写操作,则注册一个,否则仅提交(此时再注册则会报异常)。这样,需要同步的点就是:如何知道当前还有没有数据需要发送(因为其它线程也可以提交数据到此处),和如何知道此次提交时,有没有注册写操作。总之,要完成:有数据要发送时,必需有写操作被注册,并且只能注册一次;没有数据时,不能有写操作被注册。

问题分析

经过分析,上面的问题,可以抽象成:我需要知道当往队列里插入一条数据之前,该队列是否为空,如果为空则应该注册新的写操作。当从队列里取出一条数据后,该队列是否为非空,如果非空则应该继续注册写操作。(本文之后以“关注的操作”来表示这种场景下的插入或取出操作)

目前的问题是,我使用的队列是ConcurrentLinkedQueue,但是它的取出数据的方法,没有返回值告诉我们从队列里取出数据之后队列是否为空,如果是使用size或peek方法来执行判断,那就必需加锁了,否则在拿到队列大小时,可能队列大小已经变化了。所以我首先想到的是,如何对该队列进行改造,让它提供该信息。

注意:这里指的不是当次而是之后,所以如果我们使用队列的peek()方法返回null,就知道队列是否为空,但是不知道之后是否为空 ,并且,当关注的操作发生时,在插入或取出操作的返回值里告知此信息,来指导是否继续注册写操作。

用锁实现

如果使用锁的话很容易处理,用锁同步插入和取出方法,下面是锁实现的参考:


01 public E poll() {
02     synchronized (this) {
03         E re = q.poll();
04         // 获取元素后,队列是空,表示是我关注的操作
05         if (q.peek() == null) {
06  
07         }
08         return re;
09     }
10 }
11  
12 public void offer(E e) {
13     synchronized (this) {
14         // 插入元素前,队列是空,表示是我关注的操作
15         if (q.peek() == null) {
16  
17         }
18         q.offer(e);
19     }
20 }

但因为是服务端,我想用非阻塞同步算法来实现。

尝试方案一

我第一次想到的改造办法是,将head占位节点改成固定的,头节点移动时,只将head节点的next指向新的节点,在插入数据时,如果是在head节点上成功执行的该操作,那么该插入就是关注的的操作;在取出时,如果将head节点的next置为了null,那么该取出就是关注的操作( 因为之前的占位节点是变化的,所以没法比较,必需用同步,现在是固定的了,所以可以直接与head节点进行比较 )。如此一来,问题好像被解决了。改造完之后,出于严谨,我仔细读了一遍代码,发现引入了新的问题,我的取出操作是这样写的


01 /**
02  * @author trytocatch@163.com
03  */
04 public E poll(){
05     for(;;){
06         Node n = head.nextRef.get();//head指向固定的head节点,为final
07         if(n == null)
08             return null;
09         Node m = n.nextRef.get();
10         if(head.next.compareAndSet(n,m){
11             if(m==null)
12                 ;//此时为关注的操作(为了简化代码显示,不将该信息当作返回值返回了,仅注释)
13             return n.itemRef.get();
14         }
15     }
16 }

这里有一个致命的问题:如果m为null,在CAS期间,插入了新节点,n的next由null变成了非null,紧接着又把head的next更新为了null,那么链就断了,该方法还存在一些其它的问题,如当队列为空的时候,尾节点指向了错误的位置,本应该是head的。我认为最根本的原因在于,head不能设为固定的,否则会引发一堆问题。第一次尝试宣告失败。

尝试方案二

这次我尝试将head跟tail两个引用包装成一个对象,然后对这个对象进行CAS更新(这仅为一处理论上的尝试,因为从性能上面来讲,已经大打折扣了,创建了很多额外的对象),如果head跟tail指向了同一个节点,则认为队列是空的,根据此信息来判断一个操作是不是关注的操作。但该尝试仅停留在了理论分析阶段,期间发现了一些小问题,没法解决,后来我发现,我把ConcurrentLinkedQueue原本可以分成两步执行的插入和取出操作(更新节点的next或item引用,然后再更新head或tail引用),变成了必需一步完成,ConcurrentLinkedQueue尚不能一步完成,我何德何能,可将它们一步完成?所以直接放弃了。

解决方案一

经过两次的失败尝试,我几乎绝望了,我怀疑这是不是不能判断出是否为关注的操作。

因为是在做项目,周末已经过去了,不能再搞这些“研究”了,所以我退而求其次,想了个不太漂亮的办法,在队列外部维护一个变量,记录队列有多大,在插入或取出后,更新该变量,使用的是AtomicInteger,如果更新时,将变量从1变成0,或从0变成了1,就认为该插入或取出为关注的操作。


01 private AtomicInteger size = new AtomicInteger(0);
02 public E poll() {
03     E re = q.poll();
04     if (re == null)
05         return null;
06     for(int old;;){
07         old = size.get();
08         if(size.compareAndSet(old,old-1)){
09             // 获取元素后,队列是空,表示是我关注的操作
10             if(old == 1){
11  
12             }
13             break;
14         }
15     }
16     return re;
17 }
18  
19 public void offer(E e) {
20     q.offer(e);
21     for(int old;;){
22         old = size.get();
23         if(size.compareAndSet(old,old+1)){
24             // 插入元素前,队列是空,表示是我关注的操作
25             if(old == 0){
26  
27             }
28             break;
29         }
30     }
31 }

此时,也许细心的朋友会问,因为没有使用锁,这个变量并不能真实反映队列的大小,也就不能确定它是不是关注的操作。没错,是不能真实反映,但是,我获取关注的操作的目的是用来指导我:该不该注册新的写操作,该变量的值变化就能提供正确的指导,所以,同样是正确的,只不过途径不同而已。理论上的分析和后来的项目正确运行都印证了该方法的正确性。

解决方案二

因为上面的方法额外加了一次lock-free级别的CAS操作,我心里总不太舒服,空余时间总在琢磨,真的就没有办法,在不增加额外lock-free级别CAS开支的情况下,知晓一个操作是不是关注的操作?

后来经分析,如果要知晓是不是关注的操作,跟两个数据有关,真实的头节点跟尾节点(不同于head跟tail,因为它们是滞后的,之前将它们包装成一个对象就是犯了该错误),ConcurrentLinkedQueue的实现中,这两个节点是没有竞争的,一个是更新item,一个是更新next,必需得让他们竞争同一个东西,才能解决该问题,于是我想到了一个办法,取出完成后,如果该节点的next为null,就将其用CAS置为一个特殊的值,若成功则认为是关注的操作;插入成功后,如果next被替换掉的值不是null而是这个特殊值,那么该插入也为关注的操作。这仅增加了一次wait-free级别的CAS操作(取出后的那次CAS),perfect!

因为ConcurrentLinkedQueue的很多变量、内部类都是私有的,没法通过继承来改造,没办法,只得自行实现。对于队列里使用的Node,实现的方式有很多种,可以使用AtomicReference、AtomicReferenceFieldUpdater来实现,如果你愿意的话,甚至是像ConcurrentLinkedQueue一样,用sun.misc.Unsafe来实现(注意:一般来说,sun包下的类是不推荐使用的),各有优缺点吧,所以我就不提供该队列的具体实现了,下面给出在ConcurrentLinkedQueue(版本:1.7.0_10)基础上进行的改造,供参考。注意,如果需要用到size等方法,因为特殊值的引入,影响了之前的判断逻辑,应重新编写。


01 /**
02  * @author trytocatch@163.com
03  */
04  private static final Node MARK_NODE = new Node(null);
05  
06  public boolean offer(E e) {
07      checkNotNull(e);
08      final Node newNode = new Node(e);
09  
10      for (Node t = tail, p = t;;) {
11          Node q = p.next;
12          if (q == null || q == MARK_NODE) {//修改1:加入条件:或q == MARK_NODE
13              // p is last node
14              if (p.casNext(q, newNode)) {//修改2:将null改为q
15                  // Successful CAS is the linearization point
16                  // for e to become an element of this queue,
17                  // and for newNode to become "live".
18                  if (q == MARK_NODE)//修改3:
19                      ;//此时为关注的操作(为了简化代码显示,仅注释)
20                  if (p != t) // hop two nodes at a time
21                      casTail(t, newNode); // Failure is OK.
22                  return true;
23              }
24              // Lost CAS race to another thread; re-read next
25          }
26          else if (p == q)
27              // We have fallen off list. If tail is unchanged, it
28              // will also be off-list, in which case we need to
29              // jump to head, from which all live nodes are always
30              // reachable. Else the new tail is a better bet.
31              p = (t != (t = tail)) ? t : head;
32          else
33              // Check for tail updates after two hops.
34              p = (p != t && t != (t = tail)) ? t : q;
35      }
36  }
37  
38  public E poll() {
39      restartFromHead:
40      for (;;) {
41          for (Node h = head, p = h, q;;) {
42              E item = p.item;
43  
44              if (item != null && p.casItem(item, null)) {
45                  // Successful CAS is the linearization point
46                  // for item to be removed from this queue.
47                  if (p != h) // hop two nodes at a time
48                      updateHead(h, ((q = p.next) != null) ? q : p);
49                  if (p.casNext(null,MARK_NODE))//修改1:
50                      ;//此时为关注的操作
51                  return item;
52              }
53              else if ((q = p.next) == null) {
54                  updateHead(h, p);
55                  return null;
56              }
57              else if (p == q)
58                  continue restartFromHead;
59              else
60                  p = q;
61          }
62      }
63  }

小结

设计非阻塞算法的关键在于,找出竞争点,如果获取的某个信息跟两个操作有关,那么应该让这两个操作竞争同一个东西,这样才能反应出它们的关系。 

目录
相关文章
|
22天前
|
算法 安全 数据安全/隐私保护
Android经典实战之常见的移动端加密算法和用kotlin进行AES-256加密和解密
本文介绍了移动端开发中常用的数据加密算法,包括对称加密(如 AES 和 DES)、非对称加密(如 RSA)、散列算法(如 SHA-256 和 MD5)及消息认证码(如 HMAC)。重点讲解了如何使用 Kotlin 实现 AES-256 的加密和解密,并提供了详细的代码示例。通过生成密钥、加密和解密数据等步骤,展示了如何在 Kotlin 项目中实现数据的安全加密。
56 1
|
22天前
|
机器学习/深度学习 存储 算法
强化学习实战:基于 PyTorch 的环境搭建与算法实现
【8月更文第29天】强化学习是机器学习的一个重要分支,它让智能体通过与环境交互来学习策略,以最大化长期奖励。本文将介绍如何使用PyTorch实现两种经典的强化学习算法——Deep Q-Network (DQN) 和 Actor-Critic Algorithm with Asynchronous Advantage (A3C)。我们将从环境搭建开始,逐步实现算法的核心部分,并给出完整的代码示例。
53 1
|
23天前
|
算法 安全 数据安全/隐私保护
Android经典实战之常见的移动端加密算法和用kotlin进行AES-256加密和解密
本文介绍了移动端开发中常用的数据加密算法,包括对称加密(如 AES 和 DES)、非对称加密(如 RSA)、散列算法(如 SHA-256 和 MD5)及消息认证码(如 HMAC)。重点展示了如何使用 Kotlin 实现 AES-256 的加密和解密,提供了详细的代码示例。
31 2
|
23天前
|
机器学习/深度学习 算法 数据挖掘
【白话机器学习】算法理论+实战之决策树
【白话机器学习】算法理论+实战之决策树
|
30天前
|
算法 搜索推荐 Java
算法实战:手写归并排序,让复杂排序变简单!
归并排序是一种基于“分治法”的经典算法,通过递归分割和合并数组,实现O(n log n)的高效排序。本文将通过Java手写代码,详细讲解归并排序的原理及实现,帮助你快速掌握这一实用算法。
37 0
|
1月前
|
存储 NoSQL 算法
实战算法篇:设计短域名系统,将长URL转化成短的URL.
小米介绍了一种实用的短域名系统设计,用于将冗长的URL转化为简短链接。短链接不仅节省空间,便于分享,还能支持数据分析。系统通过唯一编号结合62进制转换生成短标识,并利用如Redis这样的数据库存储长链接与短标识的映射关系。最后,通过302重定向实现用户访问时的长链接恢复。这一方案适用于多种场景,有效提升用户体验与数据追踪能力。
45 9
|
2月前
|
算法 搜索推荐 开发者
别再让复杂度拖你后腿!Python 算法设计与分析实战,教你如何精准评估与优化!
【7月更文挑战第23天】在Python编程中,掌握算法复杂度—时间与空间消耗,是提升程序效能的关键。算法如冒泡排序($O(n^2)$时间/$O(1)$空间),或使用Python内置函数找最大值($O(n)$时间),需精确诊断与优化。数据结构如哈希表可将查找从$O(n)$降至$O(1)$。运用`timeit`模块评估性能,深入理解数据结构和算法,使Python代码更高效。持续实践与学习,精通复杂度管理。
51 9
|
30天前
|
数据采集 搜索推荐 算法
【高手进阶】Java排序算法:从零到精通——揭秘冒泡、快速、归并排序的原理与实战应用,让你的代码效率飙升!
【8月更文挑战第21天】Java排序算法是编程基础的重要部分,在算法设计与分析及实际开发中不可或缺。本文介绍内部排序算法,包括简单的冒泡排序及其逐步优化至高效的快速排序和稳定的归并排序,并提供了每种算法的Java实现示例。此外,还探讨了排序算法在电子商务、搜索引擎和数据分析等领域的广泛应用,帮助读者更好地理解和应用这些算法。
21 0
|
1月前
|
消息中间件 存储 算法
这些年背过的面试题——实战算法篇
本文是技术人面试系列实战算法篇,面试中关于实战算法都需要了解哪些内容?一文带你详细了解,欢迎收藏!
|
2月前
|
机器学习/深度学习 人工智能 监控
人工智能 - 目标检测算法详解及实战
目标检测需识别目标类别与位置,核心挑战为复杂背景下的多目标精准快速检测。算法分两步:目标提取(滑动窗口或区域提议)和分类(常用CNN)。IoU衡量预测与真实框重叠度,越接近1,检测越准。主流算法包括R-CNN系列(R-CNN, Fast R-CNN, Faster R-CNN),YOLO系列,SSD,各具特色,如Faster R-CNN高效候选区生成与检测,YOLO适用于实时应用。应用场景丰富,如自动驾驶行人车辆检测,安防监控,智能零售商品识别等。实现涉及数据准备、模型训练(示例YOLOv3)、评估(Precision, Recall, mAP)及测试。
83 5