非阻塞同步算法实战(三)-LatestResultsProvider

简介:

感谢trytocatch投递本文。

前言

阅读本文前,需要读者对happens-before比较熟悉,了解非阻塞同步的一些基本概念。本文主要为happens-before法则的灵活运用,和一些解决问题的小技巧,分析问题的方式。

背景介绍

原始需求为:本人当时在编写一个正则替换工具,里面会动态地显示所有的匹配结果(包括替换预览),文本、正则表达式、参数,这些数据的其中一项发生了变化,结果就应该被更新,为了提供友好的交互体验,数据变化时,应该是发起一个异步请求,由另一个独立的线程来完成运算,完成后通知UI更新结果。由于是动态显示,所以提交会非常频繁。

需求描述

需要这样一个工具类,允许用户频繁地提交数据(本文之后以“submit”表示该操作)和更新结果(本文之后以“update”表示该操作),submit时,如果当前有进行中的运算,则应该取消,使用新参数执行新的运算;update时,如果当前没有进行中的运算(处于阻塞状态),并且当前结果不是最新的,则唤醒该线程,使用当前的新数据,执行新的运算。此处之所以分为submit和update两个方法,是为了支持手动更新,即点击更新按钮时,才更新结果。

此外,出于练手的原因,也出于编写一个功能全面,更实用的工具的目的,我还加入了一些额外的需求:

1、引入多线程场景,update和submit均可由多个线程同时发起,该工具类应设计成线程安全的。

2、允许延迟执行运算,如果延时内执行submit,仅重新计算延时。如果运算不方便取消,在短时间频繁submit的场景下,延时会是一个很好的应对办法。

3、允许设置一个最大延迟时间,作为延迟开启运算的补充。当长时间频繁submit时,会形成这样的局面,一直未进入运算环节,新结果计算不出来,上一次计算结果却是很早以前的。如果需要显示一个较新但不是最新的结果,最大延迟时间将会很有用。

4、提供主动取消方法,主动取消正在进行的运算。

5、update时,允许等待运算完成,同时也可设置超时时间。当主动取消、超时、完成了当前或更(更加的意思)新的数据对应的运算时,结束等待。

需求交待完了,有兴趣有精力的读者,可以先试着思考下怎么实现。

问题分析

该工具应该维护一个状态字段,这样才能在发起某个操作时,根据所处的状态作出正确的动作,如:如果当前不处于停止状态(或者主动取消状态,原因见下文),执行update就不需要唤醒运算线程。简单分析可知,至少应该有这样几种状态:

1、停止状态:当前没有运算任务,线程进入阻塞状态,主动取消和运算完成后,进入该状态

2、延迟状态:设置了延迟开启运算时,进入运算前,处于该状态

3、运算状态:正在执行运算

4、主动取消状态:当发起主动取消时,进入该状态

5、新任务状态:当时有新的运算任务时,进入该状态,然后重新进入运算状态

延迟

再来看一下延迟,如果延迟500毫秒,就每次sleep(500),那么期间再submit怎么办?将它唤醒然后重新sleep(500)吗?显然不行,成本太大了。

我有一个小技巧:将500分成多个合适的等份,使用一个计数器,每次sleep一个等份,计数器加1,如果发起submit,仅把计数器置0即可,虽然看起来线程的状态切换变多了,但应对频繁重置时,它更稳定。虽然时间上会上下波动一个等份,但此处并不需要多么精确。

现在还面临这样一个问题,如何知道当前是处于延迟状态并计数器置0?取出状态值进行判断,然后置0,这方法显然不行,因为置0的时候,可能状态已经变了,所以你无法知道该操作是否生效了。

我想到的办法是,再引入一个延迟重置状态。如果处于该状态,则下一次计数器加1时,将计数器重置,状态变更是可以知道成功与否的。

状态变更

有些状态的变更是有条件的,比如说当前处于取消状态,就不能把它转为运算状态,运算状态只能由新任务状态、延迟状态(延迟完成后执行运算)或延迟重置状态转入。这种场景正好跟CAS一致,所以,使用一个AtomicInteger来表示状态。

分析下各状态之间的转换,可以得出下面的状态变更图:

蓝色的a(bcd)|(e)f线路为停止状态下,发起一次update,运算完重新回到停止的过程,开启延迟时是bcd,否则是e。

红色的线j表示超过了最大延迟时间,退出延迟,进入运算状态(也可以是d)。

绿色的线ghi(包括a)表示:如果发起了submit或update,状态应该怎么改变。如果处于延迟重置、新任务则不需要进行任何操作;如果处于延迟状态,则转为延迟重置即可;如果处于运算状态,则可能使用了旧参数,应该转为新任务;如果为主动取消或停止状态,并且是调用update方法,则转为新任务,并且可能处于阻塞状态,应该唤醒该线程。

黑色的线l表示,可在任意状态下发起主动取消,进入该状态。然后通知等待线程后,转入停止状态,对应紫色的k,如果在停止状态下发起主动取消,则仅转为主动取消状态,不会通知等待线程。所以当线程阻塞时,可能处于停止状态或者主动取消状态。

顺序问题

上面已经分析到,当submit时,应该把延迟转为延迟重置、或运算转为新任务,这两个尝试的顺序是不是也有讲究呢?

是的,因为正常执行流程a(bcd)|(e)f中,运算状态在延迟状态之后,假如先尝试运算转为新任务,可能此时为延迟状态,故失败,再尝试延迟转为延迟重置时,状态在这期间从刚才的延迟转为了运算,故两次尝试都失败了,本应该重置延迟的,却什么也没干,这是错误的。而将两次尝试顺序调换一下,只要状态为延迟或运算,那么两次状态转换尝试中,一定有一次会成功。

之后的代码中还有多处类似的顺序细节。

解决方案

下面给出完整的代码,除去等待运算完成那部分,其它地方均为wait-free级别的实现。

calculateResult是具体执行运算的方法;上文中的submit对应代码里的updateParametersVersion方法,上文中的update对应剩余几个update方法。

updateAndWait方法中,使用了上一篇中讲到的BoundlessCyclicBarrier,其维护的版本号就是参数的版本号ParametersVersion。

001 /**
002  * @author trytocatch@163.com
003  * @date 2013-2-2
004  */
005 public abstract class LatestResultsProvider {
006     /** update return value */
007     public static final int UPDATE_FAILED = -1;
008     public static final int UPDATE_NO_NEED_TO_UPDATE = 0;
009     public static final int UPDATE_SUCCESS = 1;
010     public static final int UPDATE_COMMITTED = 2;
011     /** update return value */
012  
013     /** work states*/
014     private static final int WS_OFF = 0;
015     private static final int WS_NEW_TASK = 1;
016     private static final int WS_WORKING = 2;
017     private static final int WS_DELAYING = 3;
018     private static final int WS_DELAY_RESET = 4;
019     private static final int WS_CANCELED = 5;
020     /** work states*/
021     private final AtomicInteger workState;
022  
023     private int sleepPeriod = 30;
024  
025     private final AtomicInteger parametersVersion;
026     private volatile int updateDelay;// updateDelay>=0
027     private volatile int delayUpperLimit;
028  
029     private final BoundlessCyclicBarrier barrier;
030     private Thread workThread;
031  
032     /**
033      *
034      * @param updateDelay unit: millisecond
035      * @param delayUpperLimit limit the sum of the delay, disabled
036      * while delayUpperLimit<0, unit: millisecond
037      */
038     public LatestResultsProvider(int updateDelay, int delayUpperLimit) {
039         if (updateDelay < 0)
040             this.updateDelay = 0;
041         else
042             this.updateDelay = updateDelay;
043         this.delayUpperLimit = delayUpperLimit;
044         barrier = new BoundlessCyclicBarrier(0);
045         workState = new AtomicInteger(WS_OFF);
046         parametersVersion = new AtomicInteger(0);
047         initThread();
048     }
049  
050     private void initThread() {
051         workThread = new Thread("trytocatch's worker") {
052             @Override
053             public void run() {
054                 int sleepCount = 0;
055                 for (;;) {
056                     try {
057                         while (!workState.compareAndSet(WS_NEW_TASK,
058                                 updateDelay > 0 ? WS_DELAY_RESET : WS_WORKING)) {
059                             if (workState.compareAndSet(WS_CANCELED, WS_OFF)) {
060                                 barrier.cancel();
061                             }
062                             LockSupport.park();
063                             interrupted();
064                         }
065                         if (workState.get() == WS_DELAY_RESET) {
066                             int delaySum = 0;
067                             for (;;) {
068                                 if (workState.compareAndSet(WS_DELAY_RESET,
069                                         WS_DELAYING)) {
070                                     sleepCount = (updateDelay + sleepPeriod - 1)
071                                             / sleepPeriod;
072                                 }
073                                 sleep(sleepPeriod);
074                                 if (--sleepCount <= 0
075                                         && workState.compareAndSet(WS_DELAYING,
076                                                 WS_WORKING))
077                                     break;
078                                 if (delayUpperLimit >= 0) {
079                                     delaySum += sleepPeriod;
080                                     if (delaySum >= delayUpperLimit) {
081                                         if (!workState.compareAndSet(
082                                                 WS_DELAYING, WS_WORKING))
083                                             workState.compareAndSet(
084                                                     WS_DELAY_RESET, WS_WORKING);
085                                         break;
086                                     }
087                                 }
088                                 if (workState.get() != WS_DELAYING
089                                         && workState.get() != WS_DELAY_RESET)
090                                     break;
091                             }
092                         }
093                         if (isWorking()) {
094                             int workingVersion = parametersVersion.get();
095                             try {
096                                 calculateResult();
097                                 if (workState.compareAndSet(WS_WORKING, WS_OFF))
098                                     barrier.nextCycle(workingVersion);
099                             catch (Throwable t) {
100                                 t.printStackTrace();
101                                 workState.set(WS_CANCELED);
102                             }
103                         }
104                     catch (InterruptedException e) {
105                         workState.compareAndSet(WS_DELAYING, WS_CANCELED);
106                         workState.compareAndSet(WS_DELAY_RESET, WS_CANCELED);
107                     }
108                 }// for(;;)
109             }// run()
110         };
111         workThread.setDaemon(true);
112         workThread.start();
113     }
114  
115     public int getUpdateDelay() {
116         return updateDelay;
117     }
118  
119     /**
120      * @param updateDelay
121      *            delay time. unit: millisecond
122      */
123     public void setUpdateDelay(int updateDelay) {
124         this.updateDelay = updateDelay < 0 0 : updateDelay;
125     }
126  
127     public int getDelayUpperLimit() {
128         return delayUpperLimit;
129     }
130  
131     /**
132      * @param delayUpperLimit limit the sum of the delay, disabled
133      * while delayUpperLimit<0, unit: millisecond
134      */
135     public void setDelayUpperLimit(int delayUpperLimit) {
136         this.delayUpperLimit = delayUpperLimit;
137     }
138  
139     public final void stopCurrentWorking() {
140         workState.set(WS_CANCELED);
141     }
142  
143     /**
144      * @return NO_NEED_TO_UPDATE, COMMITTED
145      */
146     public final int update() {
147         if (isResultUptodate())
148             return UPDATE_NO_NEED_TO_UPDATE;
149         if (workState.compareAndSet(WS_CANCELED, WS_NEW_TASK)
150                 || workState.compareAndSet(WS_OFF, WS_NEW_TASK))
151             LockSupport.unpark(workThread);
152         return UPDATE_COMMITTED;
153     }
154  
155     /**
156      * @param timeout
157      *            unit:nanoseconds
158      * @return FAILED, NO_NEED_TO_UPDATE, SUCCESS
159      * @throws InterruptedException
160      */
161     public final int updateAndWait(long nanosTimeout)
162             throws InterruptedException {
163         int newVersion = parametersVersion.get();
164         if (update() == UPDATE_NO_NEED_TO_UPDATE)
165             return UPDATE_NO_NEED_TO_UPDATE;
166         barrier.awaitWithAssignedVersion(newVersion, nanosTimeout);
167         return barrier.getVersion() - newVersion >= 0 ? UPDATE_SUCCESS
168                 : UPDATE_FAILED;
169     }
170  
171     /**
172      * @return FAILED, NO_NEED_TO_UPDATE, SUCCESS
173      * @throws InterruptedException
174      */
175     public final int updateAndWait() throws InterruptedException {
176         return updateAndWait(0);
177     }
178  
179     public final boolean isResultUptodate() {
180         return parametersVersion.get() == barrier.getVersion();
181     }
182  
183     /**
184      * be used in calculateResult()
185      * @return true: the work state is working, worth to calculate the
186      * result absolutely, otherwise you can cancel the current calculation
187      */
188     protected final boolean isWorking() {
189         return workState.get()==WS_WORKING;
190     }
191  
192     /**
193      * you must call this after update the parameters, and before calling the
194      * update
195      */
196     protected final void updateParametersVersion() {
197         int pVersion = parametersVersion.get();
198         //CAS failed means that another thread do the same work already
199         if (parametersVersion.compareAndSet(pVersion, pVersion + 1))
200             if (!workState.compareAndSet(WS_DELAYING, WS_DELAY_RESET))
201                 workState.compareAndSet(WS_WORKING, WS_NEW_TASK);
202     }
203  
204     /**
205      * implement this to deal with you task
206      */
207     protected abstract void calculateResult();
208 }

代码中,我直接在构造方法里开启了新的线程,一般来说,是不推荐这样做的,但在此处,除非在构造还未完成时就执行update方法,否则不会引发什么问题。

最后,附上该正则替换工具的介绍和下载地址:http://www.cnblogs.com/trytocatch/p/RegexReplacer.html

小结

状态变更非常适合使用非阻塞算法,并且还能够达到wait-free级别。限于篇幅,有些没讲到的细节,请读者借助代码来理解吧,如有疑问,欢迎回复讨论。

系列总结

本实战系列就到此结束了,简单总结下。

非阻塞同步相对于锁同步而言,由代码块,转为了点,是另一种思考方式。

有时,无法做到一步完成,也许可以分成两步完成,同样可以解决问题,ConcurrentLinkedQueue就是这么做的。

如果需要维护多个数据之间的某种一致关系,则可以将它们封装到一个类中,更新时采用更新该类对象的引用的方式。

众所周知,锁同步算法是难以测试的,非阻塞同步算法更加难以测试,我个人认为,其正确性主要靠慎密的推敲和论证。

非阻塞同步算法比锁同步算法要显得更复杂些,如果对性能要求不高,对非阻塞算法掌握得还不太熟练,建议不要使用非阻塞算法,锁同步算法要简洁得多,也更容易维护,如上面所说的,两条看似没有顺序的语句,调换下顺序,可能就会引发BUG。

文章转自 并发编程网-ifeve.com

目录
相关文章
|
13天前
|
存储 缓存 算法
前端算法:优化与实战技巧的深度探索
【10月更文挑战第21天】前端算法:优化与实战技巧的深度探索
13 1
|
2月前
|
大数据 UED 开发者
实战演练:利用Python的Trie树优化搜索算法,性能飙升不是梦!
在数据密集型应用中,高效搜索算法至关重要。Trie树(前缀树/字典树)通过优化字符串处理和搜索效率成为理想选择。本文通过Python实战演示Trie树构建与应用,显著提升搜索性能。Trie树利用公共前缀减少查询时间,支持快速插入、删除和搜索。以下为简单示例代码,展示如何构建及使用Trie树进行搜索与前缀匹配,适用于自动补全、拼写检查等场景,助力提升应用性能与用户体验。
50 2
|
2月前
|
算法 搜索推荐 开发者
别再让复杂度拖你后腿!Python 算法设计与分析实战,教你如何精准评估与优化!
在 Python 编程中,算法的性能至关重要。本文将带您深入了解算法复杂度的概念,包括时间复杂度和空间复杂度。通过具体的例子,如冒泡排序算法 (`O(n^2)` 时间复杂度,`O(1)` 空间复杂度),我们将展示如何评估算法的性能。同时,我们还会介绍如何优化算法,例如使用 Python 的内置函数 `max` 来提高查找最大值的效率,或利用哈希表将查找时间从 `O(n)` 降至 `O(1)`。此外,还将介绍使用 `timeit` 模块等工具来评估算法性能的方法。通过不断实践,您将能更高效地优化 Python 程序。
51 4
|
3月前
|
算法 安全 数据安全/隐私保护
Android经典实战之常见的移动端加密算法和用kotlin进行AES-256加密和解密
本文介绍了移动端开发中常用的数据加密算法,包括对称加密(如 AES 和 DES)、非对称加密(如 RSA)、散列算法(如 SHA-256 和 MD5)及消息认证码(如 HMAC)。重点讲解了如何使用 Kotlin 实现 AES-256 的加密和解密,并提供了详细的代码示例。通过生成密钥、加密和解密数据等步骤,展示了如何在 Kotlin 项目中实现数据的安全加密。
105 1
|
3月前
|
机器学习/深度学习 存储 算法
强化学习实战:基于 PyTorch 的环境搭建与算法实现
【8月更文第29天】强化学习是机器学习的一个重要分支,它让智能体通过与环境交互来学习策略,以最大化长期奖励。本文将介绍如何使用PyTorch实现两种经典的强化学习算法——Deep Q-Network (DQN) 和 Actor-Critic Algorithm with Asynchronous Advantage (A3C)。我们将从环境搭建开始,逐步实现算法的核心部分,并给出完整的代码示例。
217 1
|
3月前
|
算法 安全 数据安全/隐私保护
Android经典实战之常见的移动端加密算法和用kotlin进行AES-256加密和解密
本文介绍了移动端开发中常用的数据加密算法,包括对称加密(如 AES 和 DES)、非对称加密(如 RSA)、散列算法(如 SHA-256 和 MD5)及消息认证码(如 HMAC)。重点展示了如何使用 Kotlin 实现 AES-256 的加密和解密,提供了详细的代码示例。
72 2
|
3月前
|
机器学习/深度学习 算法 数据挖掘
【白话机器学习】算法理论+实战之决策树
【白话机器学习】算法理论+实战之决策树
|
3月前
|
消息中间件 存储 算法
这些年背过的面试题——实战算法篇
本文是技术人面试系列实战算法篇,面试中关于实战算法都需要了解哪些内容?一文带你详细了解,欢迎收藏!
|
3月前
|
算法 搜索推荐 Java
算法实战:手写归并排序,让复杂排序变简单!
归并排序是一种基于“分治法”的经典算法,通过递归分割和合并数组,实现O(n log n)的高效排序。本文将通过Java手写代码,详细讲解归并排序的原理及实现,帮助你快速掌握这一实用算法。
41 0
|
3月前
|
数据采集 搜索推荐 算法
【高手进阶】Java排序算法:从零到精通——揭秘冒泡、快速、归并排序的原理与实战应用,让你的代码效率飙升!
【8月更文挑战第21天】Java排序算法是编程基础的重要部分,在算法设计与分析及实际开发中不可或缺。本文介绍内部排序算法,包括简单的冒泡排序及其逐步优化至高效的快速排序和稳定的归并排序,并提供了每种算法的Java实现示例。此外,还探讨了排序算法在电子商务、搜索引擎和数据分析等领域的广泛应用,帮助读者更好地理解和应用这些算法。
39 0