非阻塞同步算法实战(二):BoundlessCyclicBarrier

简介: 相比上一篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。

前言

相比上一篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。


需求介绍

我需要编写一个同步工具,它需要提供这样几个方法:await、pass、cancel。某个线程调用await时,会被阻塞;当调用pass方法时,之前因为await而阻塞的线程将全部被解除阻塞,之后调用await的线程继续被阻塞,直到下一次调用pass。


该工具同时还维护一个版本号,await方法可以带一个目标版本号,如果当前的版本号比目标版本号新或相同,则直接通过,否则,阻塞本线程,直到到达或超过目标版本。调用pass的时候,更新版本号。


如果停止了版本更新,可使用cancel方法来解除所有因await而阻塞的线程,包括指定版本号的。此方法用于避免无谓地等待。若await发生在cancel之后,则仍将被阻塞。


因为CountDownLatch不允许重复使用,CyclicBarrier只支持固定个数的线程,并且都没有维护一个版本号,所以没有已有的类能实现上面的需求,需要自己实现。


问题分析

简单分析可知,应该维护一个队列,来保存当前被阻塞的线程,用于在pass时对它们一一解除阻塞,pass时应该使用一个新的队列,否则不方便正确处理pass前和pass后调用await的线程。


至此,问题的关键就明了了:如何将队列的替换和版本号的更新这两个操作做成原子的。


解决方案

以前在《JAVA并发编程实践》曾看到过这样一个小技巧,如果要原子地更新两个变量,那么可以创建一个新的类将它们封装起来,将这两个变量当定义成类成员变量,更新时,用CAS更新这个类的引用即可。

因为较为复杂,下面先给出完整的代码,再讲解其中的关键。

注意:上面所说pass,在代码中的具体实现为nextCycle,有两个版本,一个自动维护版本号,一个由调用者维护版本号。

/**

* @author trytocatch@163.com

* @time 2013-1-31

*/

publicclassBoundlessCyclicBarrier {

   protectedfinal AtomicReference waitQueueRef;

   publicBoundlessCyclicBarrier() {

       this(0);

   }

   publicBoundlessCyclicBarrier(int startVersion) {

       waitQueueRef = newAtomicReference(newVersionQueue(startVersion));

   }

   publicfinalvoidawaitWithAssignedVersion(int myVersion)

           throws InterruptedException {

       awaitImpl(true, myVersion, 0);

   }

   /**

    *

    * @param myVersion

    * @param nanosTimeout

    * @return if timeout, or be canceled and doesn't reach myVersion, returns false

    * @throws InterruptedException

    */

   publicfinalbooleanawaitWithAssignedVersion(int myVersion, long nanosTimeout)throws InterruptedException {

       return awaitImpl(true, myVersion, nanosTimeout);

   }

   publicfinalvoidawait()throws InterruptedException {

       awaitImpl(false, 0, 0);

   }

   /**

    *

    * @param nanosTimeout

    * @return if and only if timeout, returns false

    * @throws InterruptedException

    */

   publicfinalbooleanawait(long nanosTimeout)

           throws InterruptedException {

       return awaitImpl(false, 0, nanosTimeout);

   }

   /**

    * pass and version++(some threads may not be unparked when awaitImpl is in process, but it's OK in this Barrier)

    * @return old queue version

    */

   publicintnextCycle() {

       VersionQueueoldQueue= waitQueueRef.get();

       VersionQueuenewQueue=newVersionQueue(oldQueue.version + 1);

       for(;;){

           if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {

               for (Thread t : oldQueue.queue)

                   LockSupport.unpark(t);

               break;

           }

           oldQueue = waitQueueRef.get();

           newQueue.version = oldQueue.version + 1;

       }

       return oldQueue.version;

   }

   /**

    * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)

    * @param newAssignVersion

    */

   publicvoidnextCycle(int newAssignVersion) {

       VersionQueueoldQueue= waitQueueRef.getAndSet(newVersionQueue(newAssignVersion));

       for (Thread t : oldQueue.queue)

           LockSupport.unpark(t);

   }

   /**

    * if version update has stopped, invoke this to awake all threads

    */

   publicvoidcancel(){

       VersionQueueoldQueue= waitQueueRef.get();

       if (waitQueueRef.compareAndSet(oldQueue, newVersionQueue(oldQueue.version, true))) {

           for (Thread t : oldQueue.queue)

               LockSupport.unpark(t);

   }

   publicfinalintgetVersion() {

       return waitQueueRef.get().version;

   }

   privatestaticfinalclassVersionQueue {

       finalprivate ConcurrentLinkedQueue queue;

       int version;

       finalboolean isCancelQueue;

       VersionQueue(int curVersion){

           this(curVersion, false);

       }

       VersionQueue(int curVersion, boolean isCancelQueue) {

           this.version = curVersion;

           this.isCancelQueue = isCancelQueue;

           queue = newConcurrentLinkedQueue();

       }

   }

   /**

    *

    * @param assignVersion is myVersion available

    * @param myVersion wait for this version

    * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid)      * @return if timeout, or be canceled and doesn't reach myVersion, returns false      * @throws InterruptedException      */     protectedbooleanawaitImpl(boolean assignVersion, int myVersion,             long nanosTimeout)throws InterruptedException {         booleantimeOutEnable= nanosTimeout > 0;

       longlastTime= System.nanoTime();

       VersionQueuenewQueue= waitQueueRef.get();//A

       if (assignVersion && newQueue.version - myVersion >= 0)

           returntrue;

       while (true) {

           VersionQueuesubmitQueue= newQueue;//B

           submitQueue.queue.add(Thread.currentThread());//C

           while (true) {

               newQueue = waitQueueRef.get();//D

               if (newQueue != submitQueue){//E: it's a new cycle

                   if(assignVersion == false)

                       returntrue;

                   elseif(newQueue.version - myVersion >= 0)

                       returntrue;

                   elseif (newQueue.isCancelQueue)//F: be canceled

                       returnfalse;

                   else//just like invoking awaitImpl again

                       break;

               }

               if (timeOutEnable) {

                   if (nanosTimeout <= 0)

                       returnfalse;

                   LockSupport.parkNanos(this, nanosTimeout);

                   longnow= System.nanoTime();

                   nanosTimeout -= now - lastTime;

                   lastTime = now;

               } else

                   LockSupport.park(this);

               if (Thread.interrupted())

                   thrownewInterruptedException();

           }

       }

   }

}


代码分析

先分析一下awaitImpl方法,A和D是该方法的关键点,决定着它属于哪一个批次,对应哪一个版本。这里有个小细节,在nexeCycle,cancel解除阻塞时,该线程可能并不在队列中,因为插入队列发生在C处,这在A和D之后(虽然看起来C在D之前,但D取到的queue要在下一次循环时才被当作submitQueue),所以,在E处再进行了一次判断,开始解除阻塞时,旧队列肯定被新队列所替换,newQueue != submitQueue一定为真,就会不调用park进行阻塞了,也就不需要解除阻塞,所以即使解除阻塞时,该线程不在队列中也是没问题的。


再看E处,当进入一个新的cycle时(当前队列与提交的队列不同),a)如果没指定版本,或者到达或超过了指定版本,则返回true;b)如果当前调用了cancel,则当前队列的isCancelQueue将为true,则不继续傻等,返回false;c)或者还未到达指定版本,break,插入到当前队列中,继续等待指定版本的到达。


如果没有进入E处的IF内,则当前线程会被阻塞,直到超时,然后返回false;或被中断,然后抛出InterruptedException;或被解除阻塞,重新进行E处的判定。


这里还有个小细节,既然cancel时,把当前的队列设置了isCancelQueue,那么之后指定版本的await会不会也直接返回了呢?其实不会的,因为它若要执行F处的判断,则先必需通过E处的判定,这意味着,当前队列已经不是提交时的那个设置了isCancelQueue的队列了。


代码中对于cancel的处理,其实并不保证cancel后,之前的await都会被解除阻塞并返回,如果cancel后,紧接着又调用了nextCycle,那么可能某线程感知不到cancel的调用,唤醒后又继续等待指定的版本。cancel的目的是在于不让线程傻等,既然恢复版本更新了,那就继续等待吧。


如果自己维护版本号,则应该保证递增。另外,版本号的设计,考虑到了int溢出的情况,版本的前后判断,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,这样,版本号就相当于循环使用了,只要两个比较的版本号的差不超过int的最大值,那么都是正确的,int的最大值可是20多亿,几乎不可能出现跨度这么大的两个版本号的比较,所以,认为它是正确的。


小结

本文讲到了一个非阻塞同步算法设计时的小技巧,如果多个变量之间要维护某种特定关系,那么可以将它们封装到一个类中,再用CAS更新这个类的引用,这样就达到了:要么都被更新,要么都没被更新,保持了多个变量之间的一致性。同时需要注意的是,每次更新都必需创建新的包装对象,假如有其它更好的办法,应该避免使用该方法。


相关文章
|
2月前
|
存储 缓存 算法
前端算法:优化与实战技巧的深度探索
【10月更文挑战第21天】前端算法:优化与实战技巧的深度探索
21 1
|
3月前
|
大数据 UED 开发者
实战演练:利用Python的Trie树优化搜索算法,性能飙升不是梦!
在数据密集型应用中,高效搜索算法至关重要。Trie树(前缀树/字典树)通过优化字符串处理和搜索效率成为理想选择。本文通过Python实战演示Trie树构建与应用,显著提升搜索性能。Trie树利用公共前缀减少查询时间,支持快速插入、删除和搜索。以下为简单示例代码,展示如何构建及使用Trie树进行搜索与前缀匹配,适用于自动补全、拼写检查等场景,助力提升应用性能与用户体验。
58 2
|
3月前
|
算法 搜索推荐 开发者
别再让复杂度拖你后腿!Python 算法设计与分析实战,教你如何精准评估与优化!
在 Python 编程中,算法的性能至关重要。本文将带您深入了解算法复杂度的概念,包括时间复杂度和空间复杂度。通过具体的例子,如冒泡排序算法 (`O(n^2)` 时间复杂度,`O(1)` 空间复杂度),我们将展示如何评估算法的性能。同时,我们还会介绍如何优化算法,例如使用 Python 的内置函数 `max` 来提高查找最大值的效率,或利用哈希表将查找时间从 `O(n)` 降至 `O(1)`。此外,还将介绍使用 `timeit` 模块等工具来评估算法性能的方法。通过不断实践,您将能更高效地优化 Python 程序。
62 4
|
4月前
|
算法 安全 数据安全/隐私保护
Android经典实战之常见的移动端加密算法和用kotlin进行AES-256加密和解密
本文介绍了移动端开发中常用的数据加密算法,包括对称加密(如 AES 和 DES)、非对称加密(如 RSA)、散列算法(如 SHA-256 和 MD5)及消息认证码(如 HMAC)。重点讲解了如何使用 Kotlin 实现 AES-256 的加密和解密,并提供了详细的代码示例。通过生成密钥、加密和解密数据等步骤,展示了如何在 Kotlin 项目中实现数据的安全加密。
140 1
|
4月前
|
机器学习/深度学习 存储 算法
强化学习实战:基于 PyTorch 的环境搭建与算法实现
【8月更文第29天】强化学习是机器学习的一个重要分支,它让智能体通过与环境交互来学习策略,以最大化长期奖励。本文将介绍如何使用PyTorch实现两种经典的强化学习算法——Deep Q-Network (DQN) 和 Actor-Critic Algorithm with Asynchronous Advantage (A3C)。我们将从环境搭建开始,逐步实现算法的核心部分,并给出完整的代码示例。
272 1
|
4月前
|
算法 安全 数据安全/隐私保护
Android经典实战之常见的移动端加密算法和用kotlin进行AES-256加密和解密
本文介绍了移动端开发中常用的数据加密算法,包括对称加密(如 AES 和 DES)、非对称加密(如 RSA)、散列算法(如 SHA-256 和 MD5)及消息认证码(如 HMAC)。重点展示了如何使用 Kotlin 实现 AES-256 的加密和解密,提供了详细的代码示例。
77 2
|
4月前
|
机器学习/深度学习 算法 数据挖掘
【白话机器学习】算法理论+实战之决策树
【白话机器学习】算法理论+实战之决策树
|
4月前
|
消息中间件 存储 算法
这些年背过的面试题——实战算法篇
本文是技术人面试系列实战算法篇,面试中关于实战算法都需要了解哪些内容?一文带你详细了解,欢迎收藏!
|
4月前
|
算法 搜索推荐 Java
算法实战:手写归并排序,让复杂排序变简单!
归并排序是一种基于“分治法”的经典算法,通过递归分割和合并数组,实现O(n log n)的高效排序。本文将通过Java手写代码,详细讲解归并排序的原理及实现,帮助你快速掌握这一实用算法。
42 0
|
4月前
|
数据采集 搜索推荐 算法
【高手进阶】Java排序算法:从零到精通——揭秘冒泡、快速、归并排序的原理与实战应用,让你的代码效率飙升!
【8月更文挑战第21天】Java排序算法是编程基础的重要部分,在算法设计与分析及实际开发中不可或缺。本文介绍内部排序算法,包括简单的冒泡排序及其逐步优化至高效的快速排序和稳定的归并排序,并提供了每种算法的Java实现示例。此外,还探讨了排序算法在电子商务、搜索引擎和数据分析等领域的广泛应用,帮助读者更好地理解和应用这些算法。
45 0
下一篇
无影云桌面