那些“卡死”的任务:Python 3.11 以下如何优雅地实现自动取消?

简介: 本文揭秘Python异步超时取消的“隐形陷阱”:3.11前`asyncio.wait_for`在并发下可能忽略取消信号。通过真实案例、源码剖析,给出三大可靠方案——自封装健壮版`wait_for`、第三方库`quattro.CancelScope`、线程/进程级手动信号控制,助你彻底告别“取消失效”。

小李今天又遇到了烦心事。

他写了一个数据处理脚本,要调用外部API获取一万个用户的信息。每个请求大概要等2秒。他不想干等着,所以用了asyncio,并发发出去50个请求。

跑了三分钟,脚本卡住了。
代理 IP 使用小技巧 让你的数据抓取效率翻倍 (51).png

不是死锁,就是单纯的慢——有几个API服务不稳定,响应要半分钟。他想:“能不能设定一个超时时间,比如5秒,超过5秒就不等了,直接跳过?”

这个需求太常见了。不管是网络请求、数据库查询还是复杂的计算任务,你总不希望它无限期地卡下去。就像点外卖,等了45分钟还没到,你肯定想取消订单,换一家点。

问题在于,Python的异步超时机制,在3.11之前有一个不大不小的“坑”,如果你不注意,任务可能根本取消不掉。

一个“取消失败”的真实案例
先来看一段代码,它在Python 3.10上运行:

import asyncio

async def quick_task():

# 这个任务瞬间完成
return "done"

async def wrapper():

# 设置30秒超时,但实际任务1秒就完成
return await asyncio.wait_for(quick_task(), timeout=30)

async def main():
task = asyncio.create_task(wrapper())
await asyncio.sleep(0) # 让任务开始执行
task.cancel() # 手动取消

try:
    await task
except asyncio.CancelledError:
    print("任务被取消了")
else:
    print("任务没有被取消!")

asyncio.run(main())

在Python 3.8上运行,输出的是“任务被取消了”。但在Python 3.9或3.10上运行,输出的却是“任务没有被取消!”

什么情况?明明调用了cancel(),为什么任务没被取消?

真相藏在wait_for的源码里
要理解这个问题,得看看asyncio.wait_for这个函数到底在干什么。

wait_for的作用是:给一个任务设置一个超时时间,如果超时了就取消它。但这里有一个细节——如果任务在超时之前就已经完成了,而在完成的那一瞬间你恰好发起了取消请求,wait_for会怎么处理?

Python 3.8的做法是:不管任务完没完成,只要收到了取消信号,就抛出一个CancelledError。

Python 3.9及之后的做法是:先检查一下任务是不是已经完成了。如果已经完成了,“取消”就没有意义了,直接返回任务的结果,不抛异常。

听起来3.9之后的逻辑更合理对吧?毕竟任务都做完了,还取消什么?

但在并发环境下,问题就出在这个“先检查”上。

想象一下这个时序:

这个“任务完成了吗”的判断,在多线程/多任务的并发环境下,会因为时序问题出现误判。任务实际上还没有真正完成,只是处于“即将完成”的状态,wait_for就可能把它当成“已完成”,从而无视你的取消请求。

这不是bug,而是设计上的一种取舍。但这个取舍导致了一个后果:在Python 3.11以下的版本中,取消操作并不是100%可靠的。

那怎么办?三个靠谱的解决方案
既然wait_for靠不住,那就自己动手。

方案一:自己封装一个“靠谱版的wait_for”
思路很简单:不依赖wait_for的取消机制,而是自己用asyncio.create_task()加一个超时的“看门狗”。

import asyncio

async def cancellable_wait_for(coro, timeout):
"""
一个更可靠的wait_for版本,确保取消信号不会被吞掉
"""
task = asyncio.create_task(coro)

try:
    # 等待任务完成,或者超时
    return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
    # 超时了,取消任务
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        # 确保取消信号被传播出去
        pass
    raise  # 重新抛出TimeoutError
except asyncio.CancelledError:
    # 外部取消了,把取消信号传递给内部任务
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass
    raise  # 重新抛出CancelledError

async def my_task():
try:
await asyncio.sleep(10)
return "完成"
except asyncio.CancelledError:
print("内部任务被取消了")
raise

async def main():
task = asyncio.create_task(cancellable_wait_for(my_task(), timeout=5))
await asyncio.sleep(2)
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("外部:确实被取消了")

asyncio.run(main())

这个方案的要点是:不管什么情况,只要外部取消或者超时,都强制取消内部任务,然后把取消信号往上抛。不会被“任务已完成”这种假象迷惑。

方案二:用第三方库quattro的CancelScope
Python 3.11之后,官方推出了asyncio.timeout(),终于有了靠谱的超时机制。那3.11以下怎么办?

有一个第三方库叫quattro,它在3.11以下版本中实现了类似的功能。

pip install quattro

import asyncio
from quattro import move_on_after

async def main():
with move_on_after(5) as scope: # 5秒超时
result = await some_slow_operation()
print(result)

if scope.cancelled_caught:
    print("超时被取消了,但程序继续运行")

asyncio.run(main())

quattro的CancelScope比官方的更灵活:它是普通的上下文管理器(不需要async with),可以手动调用scope.cancel()提前取消,还可以查询是否被取消了。

如果你在项目中需要同时支持Python 3.9、3.10和3.11,quattro是个不错的选择。代码写一遍,在所有版本上行为一致。

方案三:最底层的做法——自己用Event和超时循环
如果不想引入第三方库,也嫌自己封装太麻烦,还有一个最朴素的办法:不用wait_for,自己用asyncio.Event加上循环检查。

import asyncio

async def cancellable_operation(timeout):
"""
在操作内部主动检查超时和取消信号
"""
start = asyncio.get_event_loop().time()

# 假设这是一系列的小步骤
for step in range(10):
    # 检查是否超时
    if asyncio.get_event_loop().time() - start > timeout:
        raise asyncio.TimeoutError()

    # 检查是否被取消(通过捕获取消信号)
    try:
        await asyncio.sleep(0.5)  # 模拟一个小步骤
    except asyncio.CancelledError:
        # 做清理工作
        print("收到取消信号,正在清理...")
        raise  # 重新抛出,让上层知道被取消了

    print(f"完成步骤 {step}")

return "全部完成"

async def main():
task = asyncio.create_task(cancellable_operation(timeout=3))
await asyncio.sleep(2)
task.cancel()

try:
    result = await task
    print(result)
except asyncio.TimeoutError:
    print("超时了")
except asyncio.CancelledError:
    print("被取消了")

asyncio.run(main())

这个方案的优点是:你完全掌控取消逻辑。缺点是:你得在操作内部主动插入检查点。如果你的操作是一大块无法分割的同步代码,这个方法就不太适用了。

那同步代码怎么办?线程池里怎么取消?
上面聊的都是异步代码(async/await)。但小李的脚本里,requests.get()是同步的,它不响应asyncio的取消信号。

这种情况下,task.cancel()根本没用。因为取消信号只在await的地方才能被处理,而同步代码里没有await。

解决方案是:用loop.run_in_executor把同步代码扔到线程池里,然后用一个threading.Event来手动控制停止。

import asyncio
import threading
import time

def blocking_task(stop_event):
"""
这是一个会阻塞的同步函数
它定期检查stop_event,如果被设置了就主动退出
"""
print("同步任务开始")
for i in range(30):
if stop_event.is_set():
print("收到停止信号,主动退出")
return "被停止了"
print(f"工作中... {i}")
time.sleep(1) # 模拟耗时操作
return "正常完成"

async def main():
stop_event = threading.Event()
loop = asyncio.get_running_loop()

# 把同步任务扔到线程池里执行
task = loop.run_in_executor(None, blocking_task, stop_event)

# 5秒后发送停止信号
await asyncio.sleep(5)
stop_event.set()

# 等待任务结束
result = await task
print(f"结果: {result}")

asyncio.run(main())

关键点:同步任务本身必须主动检查stop_event,不能指望Python帮你“强行”取消。强行杀线程在Python里是不安全的,也不被推荐。

那如果用多进程呢?
如果你面对的是CPU密集型的任务,线程也帮不了你——Python的GIL锁会让多线程在计算任务上毫无优势。这时候考虑用ProcessPoolExecutor:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import time

def cpu_intensive_task():
"""模拟一个费CPU的大活儿"""
total = 0
for i in range(100000000):
total += i

    # 每1000万次检查一次,但进程间通信复杂,先忽略细节
    if i % 10000000 == 0:
        print(f"计算到 {i}")
return total

async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor(max_workers=1) as pool:

    # 在子进程中执行
    task = loop.run_in_executor(pool, cpu_intensive_task)

    # 等待3秒
    await asyncio.sleep(3)
    print("3秒到了,但子进程不会自己停...")
    # 注意:ProcessPoolExecutor没有优雅的取消方法
    # 只能shutdown(wait=False)但会有warning
    # 或者terminate,但不推荐

多进程的取消更麻烦。进程不像线程,你不能优雅地通知它“停下来”。常见的做法是:在子进程里也放一个检查循环,通过进程间通信(比如multiprocessing.Event或队列)来传递停止信号。

总结一下,到底选哪个方案?
给你一张决策表:

你的场景 推荐方案
纯异步代码,Python 3.11+ 直接用官方asyncio.timeout()
纯异步代码,Python 3.10及以下 用quattro的CancelScope,或自己封装cancellable_wait_for
同步阻塞代码(如requests) 线程池 + threading.Event手动轮询检查
CPU密集型代码 多进程 + 进程间通信信号,或用concurrent.futures的超时(有限支持)
不想改现有代码,只想加个保护 用asyncio.wait_for,但要接受它可能偶尔失效
小李最后选择了方案一:自己封装了一个cancellable_wait_for,用了半小时写完,以后所有异步任务都走这个函数,再也没出现过“取消不掉”的情况。

他说了一句大实话:“官方的不靠谱,就自己写一个。反正常用的功能就那么几个,封装一次到处用,不亏。”

目录
相关文章
|
4天前
|
缓存 人工智能 自然语言处理
我对比了8个Claude API中转站,踩了不少坑,总结给你
本文是个人开发者耗时1周实测的8大Claude中转平台横向评测,聚焦Claude Code真实体验:以加权均价(¥/M token)、内部汇率、缓存支持、模型真实性及稳定性为核心指标。
|
22天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
34915 57
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
16天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
15014 44
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
11天前
|
人工智能 JavaScript Ubuntu
低成本搭建AIP自动化写作系统:Hermes保姆级使用教程,长文和逐步实操贴图
我带着怀疑的态度,深度使用了几天,聚焦微信公众号AIP自动化写作场景,写出来的几篇文章,几乎没有什么修改,至少合乎我本人的意愿,而且排版风格,也越来越完善,同样是起码过得了我自己这一关。 这个其实OpenClaw早可以实现了,但是目前我觉得最大的区别是,Hermes会自主总结提炼,并更新你的写作技能。 相信就冲这一点,就值得一试。 这篇帖子主要就Hermes部署使用,作一个非常详细的介绍,几乎一步一贴图。 关于Hermes,无论你赞成哪种声音,我希望都是你自己动手行动过,发自内心的选择!
2929 28
|
23小时前
|
云安全 人工智能 安全
|
1月前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
45865 160
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
7天前
|
弹性计算 人工智能 自然语言处理
阿里云Qwen3.6全新开源,三步完成专有版部署!
Qwen3.6是阿里云全新MoE架构大模型系列,稀疏激活显著降低推理成本,兼顾顶尖性能与高性价比;支持多规格、FP8量化、原生Agent及100+语言,开箱即用。

热门文章

最新文章

下一篇
开通oss服务