用例:
我正在编写一个主动的会话更新程序;用例适用于并行API调用或顺序API调用拦截API调用并检查令牌是否有效。
如果令牌有效,则继续调用。如果令牌无效,则首先更新令牌,然后继续进行API调用。
这对于顺序调用来说是一条小路,但是对于并行调用,我想避免DDOS对重复的令牌调用使用服务器。
相反,我有以下流程:
假设我的令牌更新程序拦截了五个并行调用的突发事件,第一个调用(线程)去更新令牌,所有其他四个将等待,直到第一个调用更新了令牌。令牌更新后,其他线程将继续正常执行,因此跳过令牌更新。
这里的另一点是,请求可能会在T = 1的5个API调用中突发出现,而在T = 2的10个API调用中突发出现,因此我也对重置值感兴趣(例如在executeLatch中)
现在,我为实现此目的而编写的代码已加载了框架和行话,为了简单起见,我想出了下面的代码框架:
class SingleUpdater {
private val lock = ReentrantLock()
private val executionLatch = AtomicBoolean(false)
// Say valueToBeUpdated is my session token
private val valueToBeUpdated = AtomicInteger(0)
private val awaitingThreadsCounter = AtomicInteger(0)
// Method to emulate bursts of threads
fun start() {
for (i in 0 until 100) {
Thread(this::execute).start()
Thread(this::execute).start()
Thread(this::execute).start()
}
// Toggle burst emulator
Thread.sleep(3000)
for (i in 0 until 1) {
Thread(this::execute).start()
Thread(this::execute).start()
Thread(this::execute).start()
}
}
private fun execute() {
println("Trying to acquire lock ${Thread.currentThread().name}")
try {
awaitingThreadsCounter.set(awaitingThreadsCounter.get() + 1)
println("============================= awaitingThreadsCounter = ${awaitingThreadsCounter.get()} ============================= ")
lock.lock()
println("I am Thread ${Thread.currentThread().name} I have acquired lock")
if (executionLatch.get()) {
continueWithRestOfExecution()
} else {
doUpdate()
continueWithRestOfExecution()
}
} catch (e: Exception) {
e.printStackTrace()
} finally {
println("Released by ${Thread.currentThread().name}")
lock.unlock()
}
}
private fun continueWithRestOfExecution() {
if (awaitingThreadsCounter.get() >= 0) awaitingThreadsCounter.set(awaitingThreadsCounter.get() - 1)
println("I am Thread: ${Thread.currentThread().name} and I am executing continueWithRestOfExecution")
if (awaitingThreadsCounter.get() == 0) {
println(" ============================= BALANCED ============================= ")
executionLatch.set(false)
}
}
private fun doUpdate() {
try {
Thread.sleep(3000)
valueToBeUpdated.set(valueToBeUpdated.get() + 1)
println("=============================== Value is: $valueToBeUpdated ${Thread.currentThread().name} ===============================")
executionLatch.set(true)
} catch (ex: Exception) {
ex.printStackTrace()
}
}
} 如果您有更好的实施方法,请在此处帮助分析边缘情况。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
您的代码实现了一个线程同步机制,以确保在并行API调用中仅更新一次令牌。您使用了ReentrantLock
来控制并发访问,并通过AtomicBoolean
和AtomicInteger
来协调等待的线程和执行状态。这是一个很好的尝试,但有几个地方可以优化或改进:
减少锁的范围:尽量减小锁的持有时间,以提高并发性能。在您的代码中,lock.lock()
到lock.unlock()
之间的代码块可以进一步缩小。
简化计数逻辑:awaitingThreadsCounter
的递增和递减操作可以在更安全的方式下进行,比如使用getAndIncrement
和getAndDecrement
方法,这样可以避免竞态条件。
明确信号机制:当前的executionLatch
设计用于标记是否已经执行过更新,但是它的重置时机可能不够清晰。考虑使用CountDownLatch
替代,它能更直观地表示等待特定数量的操作完成。
处理突发情况的优化:对于T=1时5个调用,T=2时10个调用的场景,当前方案能够应对,但可以通过更精细的信号量(Semaphore)来限制并发更新的数量,从而更好地管理资源。
基于以上分析,这里提供一个简化的示例代码框架,使用CountDownLatch
和Semaphore
来优化您的实现:
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger
class TokenUpdater {
private val updateLock = ReentrantLock()
private val updateSignal = CountDownLatch(1)
private val semaphore = Semaphore(1)
private var tokenIsValid = false
private var tokenValue = AtomicInteger(0)
fun start() {
// Simulate bursts of threads
for (i in 0 until 100) simulateBurst(3)
Thread.sleep(3000)
simulateBurst(1)
}
private fun simulateBurst(threadCount: Int) {
repeat(threadCount) {
Thread { executeApiCall() }.start()
}
}
private fun executeApiCall() {
try {
semaphore.acquire()
updateLock.lock()
if (!tokenIsValid) {
updateToken()
}
// Perform the API call with the updated token
performApiCallWithToken(tokenValue.get())
} catch (e: Exception) {
e.printStackTrace()
} finally {
updateLock.unlock()
semaphore.release()
}
}
private fun updateToken() {
tokenIsValid = true
tokenValue.incrementAndGet()
println("Token updated by ${Thread.currentThread().name}, new value: ${tokenValue.get()}")
updateSignal.countDown() // Signal that the update is done
}
private fun performApiCallWithToken(token: Int) {
println("Executing API call with token $token by ${Thread.currentThread().name}")
}
}
在这个版本中: - 使用semaphore
限制同时尝试更新令牌的线程数。 - updateSignal
作为一次性信号,当令牌首次更新后,后续线程无需等待,因为它们将直接看到tokenIsValid
为true
。 - 减少了锁的范围,只在实际需要原子性操作的地方使用锁。 - 简化了线程计数和同步逻辑,利用现有并发工具更清晰地表达意图。
请注意,这只是一个基础示例,根据实际应用场景,可能还需要进一步调整和完善。