小李今天又遇到了烦心事。
他写了一个数据处理脚本,要调用外部API获取一万个用户的信息。每个请求大概要等2秒。他不想干等着,所以用了asyncio,并发发出去50个请求。
跑了三分钟,脚本卡住了。
不是死锁,就是单纯的慢——有几个API服务不稳定,响应要半分钟。他想:“能不能设定一个超时时间,比如5秒,超过5秒就不等了,直接跳过?”
这个需求太常见了。不管是网络请求、数据库查询还是复杂的计算任务,你总不希望它无限期地卡下去。就像点外卖,等了45分钟还没到,你肯定想取消订单,换一家点。
问题在于,Python的异步超时机制,在3.11之前有一个不大不小的“坑”,如果你不注意,任务可能根本取消不掉。
一个“取消失败”的真实案例
先来看一段代码,它在Python 3.10上运行:
import asyncio
async def quick_task():
# 这个任务瞬间完成
return "done"
async def wrapper():
# 设置30秒超时,但实际任务1秒就完成
return await asyncio.wait_for(quick_task(), timeout=30)
async def main():
task = asyncio.create_task(wrapper())
await asyncio.sleep(0) # 让任务开始执行
task.cancel() # 手动取消
try:
await task
except asyncio.CancelledError:
print("任务被取消了")
else:
print("任务没有被取消!")
asyncio.run(main())
在Python 3.8上运行,输出的是“任务被取消了”。但在Python 3.9或3.10上运行,输出的却是“任务没有被取消!”
什么情况?明明调用了cancel(),为什么任务没被取消?
真相藏在wait_for的源码里
要理解这个问题,得看看asyncio.wait_for这个函数到底在干什么。
wait_for的作用是:给一个任务设置一个超时时间,如果超时了就取消它。但这里有一个细节——如果任务在超时之前就已经完成了,而在完成的那一瞬间你恰好发起了取消请求,wait_for会怎么处理?
Python 3.8的做法是:不管任务完没完成,只要收到了取消信号,就抛出一个CancelledError。
Python 3.9及之后的做法是:先检查一下任务是不是已经完成了。如果已经完成了,“取消”就没有意义了,直接返回任务的结果,不抛异常。
听起来3.9之后的逻辑更合理对吧?毕竟任务都做完了,还取消什么?
但在并发环境下,问题就出在这个“先检查”上。
想象一下这个时序:
这个“任务完成了吗”的判断,在多线程/多任务的并发环境下,会因为时序问题出现误判。任务实际上还没有真正完成,只是处于“即将完成”的状态,wait_for就可能把它当成“已完成”,从而无视你的取消请求。
这不是bug,而是设计上的一种取舍。但这个取舍导致了一个后果:在Python 3.11以下的版本中,取消操作并不是100%可靠的。
那怎么办?三个靠谱的解决方案
既然wait_for靠不住,那就自己动手。
方案一:自己封装一个“靠谱版的wait_for”
思路很简单:不依赖wait_for的取消机制,而是自己用asyncio.create_task()加一个超时的“看门狗”。
import asyncio
async def cancellable_wait_for(coro, timeout):
"""
一个更可靠的wait_for版本,确保取消信号不会被吞掉
"""
task = asyncio.create_task(coro)
try:
# 等待任务完成,或者超时
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
# 超时了,取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
# 确保取消信号被传播出去
pass
raise # 重新抛出TimeoutError
except asyncio.CancelledError:
# 外部取消了,把取消信号传递给内部任务
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
raise # 重新抛出CancelledError
async def my_task():
try:
await asyncio.sleep(10)
return "完成"
except asyncio.CancelledError:
print("内部任务被取消了")
raise
async def main():
task = asyncio.create_task(cancellable_wait_for(my_task(), timeout=5))
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("外部:确实被取消了")
asyncio.run(main())
这个方案的要点是:不管什么情况,只要外部取消或者超时,都强制取消内部任务,然后把取消信号往上抛。不会被“任务已完成”这种假象迷惑。
方案二:用第三方库quattro的CancelScope
Python 3.11之后,官方推出了asyncio.timeout(),终于有了靠谱的超时机制。那3.11以下怎么办?
有一个第三方库叫quattro,它在3.11以下版本中实现了类似的功能。
pip install quattro
import asyncio
from quattro import move_on_after
async def main():
with move_on_after(5) as scope: # 5秒超时
result = await some_slow_operation()
print(result)
if scope.cancelled_caught:
print("超时被取消了,但程序继续运行")
asyncio.run(main())
quattro的CancelScope比官方的更灵活:它是普通的上下文管理器(不需要async with),可以手动调用scope.cancel()提前取消,还可以查询是否被取消了。
如果你在项目中需要同时支持Python 3.9、3.10和3.11,quattro是个不错的选择。代码写一遍,在所有版本上行为一致。
方案三:最底层的做法——自己用Event和超时循环
如果不想引入第三方库,也嫌自己封装太麻烦,还有一个最朴素的办法:不用wait_for,自己用asyncio.Event加上循环检查。
import asyncio
async def cancellable_operation(timeout):
"""
在操作内部主动检查超时和取消信号
"""
start = asyncio.get_event_loop().time()
# 假设这是一系列的小步骤
for step in range(10):
# 检查是否超时
if asyncio.get_event_loop().time() - start > timeout:
raise asyncio.TimeoutError()
# 检查是否被取消(通过捕获取消信号)
try:
await asyncio.sleep(0.5) # 模拟一个小步骤
except asyncio.CancelledError:
# 做清理工作
print("收到取消信号,正在清理...")
raise # 重新抛出,让上层知道被取消了
print(f"完成步骤 {step}")
return "全部完成"
async def main():
task = asyncio.create_task(cancellable_operation(timeout=3))
await asyncio.sleep(2)
task.cancel()
try:
result = await task
print(result)
except asyncio.TimeoutError:
print("超时了")
except asyncio.CancelledError:
print("被取消了")
asyncio.run(main())
这个方案的优点是:你完全掌控取消逻辑。缺点是:你得在操作内部主动插入检查点。如果你的操作是一大块无法分割的同步代码,这个方法就不太适用了。
那同步代码怎么办?线程池里怎么取消?
上面聊的都是异步代码(async/await)。但小李的脚本里,requests.get()是同步的,它不响应asyncio的取消信号。
这种情况下,task.cancel()根本没用。因为取消信号只在await的地方才能被处理,而同步代码里没有await。
解决方案是:用loop.run_in_executor把同步代码扔到线程池里,然后用一个threading.Event来手动控制停止。
import asyncio
import threading
import time
def blocking_task(stop_event):
"""
这是一个会阻塞的同步函数
它定期检查stop_event,如果被设置了就主动退出
"""
print("同步任务开始")
for i in range(30):
if stop_event.is_set():
print("收到停止信号,主动退出")
return "被停止了"
print(f"工作中... {i}")
time.sleep(1) # 模拟耗时操作
return "正常完成"
async def main():
stop_event = threading.Event()
loop = asyncio.get_running_loop()
# 把同步任务扔到线程池里执行
task = loop.run_in_executor(None, blocking_task, stop_event)
# 5秒后发送停止信号
await asyncio.sleep(5)
stop_event.set()
# 等待任务结束
result = await task
print(f"结果: {result}")
asyncio.run(main())
关键点:同步任务本身必须主动检查stop_event,不能指望Python帮你“强行”取消。强行杀线程在Python里是不安全的,也不被推荐。
那如果用多进程呢?
如果你面对的是CPU密集型的任务,线程也帮不了你——Python的GIL锁会让多线程在计算任务上毫无优势。这时候考虑用ProcessPoolExecutor:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
def cpu_intensive_task():
"""模拟一个费CPU的大活儿"""
total = 0
for i in range(100000000):
total += i
# 每1000万次检查一次,但进程间通信复杂,先忽略细节
if i % 10000000 == 0:
print(f"计算到 {i}")
return total
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor(max_workers=1) as pool:
# 在子进程中执行
task = loop.run_in_executor(pool, cpu_intensive_task)
# 等待3秒
await asyncio.sleep(3)
print("3秒到了,但子进程不会自己停...")
# 注意:ProcessPoolExecutor没有优雅的取消方法
# 只能shutdown(wait=False)但会有warning
# 或者terminate,但不推荐
多进程的取消更麻烦。进程不像线程,你不能优雅地通知它“停下来”。常见的做法是:在子进程里也放一个检查循环,通过进程间通信(比如multiprocessing.Event或队列)来传递停止信号。
总结一下,到底选哪个方案?
给你一张决策表:
你的场景 推荐方案
纯异步代码,Python 3.11+ 直接用官方asyncio.timeout()
纯异步代码,Python 3.10及以下 用quattro的CancelScope,或自己封装cancellable_wait_for
同步阻塞代码(如requests) 线程池 + threading.Event手动轮询检查
CPU密集型代码 多进程 + 进程间通信信号,或用concurrent.futures的超时(有限支持)
不想改现有代码,只想加个保护 用asyncio.wait_for,但要接受它可能偶尔失效
小李最后选择了方案一:自己封装了一个cancellable_wait_for,用了半小时写完,以后所有异步任务都走这个函数,再也没出现过“取消不掉”的情况。
他说了一句大实话:“官方的不靠谱,就自己写一个。反正常用的功能就那么几个,封装一次到处用,不亏。”