使用这些方法让你的 Python 并发任务执行得更好

简介: 使用这些方法让你的 Python 并发任务执行得更好

动动发财的小手,点个赞吧!

问题

一直以来,Python的多线程性能因为GIL而一直没有达到预期。

所以从 3.4 版本开始,Python 引入了 asyncio 包,通过并发的方式并发执行 IO-bound 任务。经过多次迭代,asyncio API 的效果非常好,并发任务的性能相比多线程版本有了很大的提升。

但是,程序员在使用asyncio时还是会犯很多错误:

一个错误如下图所示,直接使用await协程方法,将对并发任务的调用从异步变为同步,最终失去并发特性。

async def main():
    result_1 = await some_coro("name-1")
    result_2 = await some_coro("name-2")
AI 代码解读

另一个错误如下图所示,虽然程序员意识到他需要使用create_task创建一个任务在后台执行。而下面这种一个一个等待任务的方式,将不同时序的任务变成了有序的等待。

async def main():
    task_1 = asyncio.create_task(some_coro("name-1"))
    task_2 = asyncio.create_task(some_coro("name-2"))

    result_1 = await task_1
    result_2 = await task_2
AI 代码解读

此代码将等待 task_1 先完成,而不管 task_2 是否先完成。

什么是并发任务执行?

那么,什么是真正的并发任务呢?我们用一张图来说明:

如图所示,一个并发流程应该由两部分组成:启动后台任务,将后台任务重新加入主函数,并获取结果。

大多数读者已经知道如何使用 create_task 启动后台任务。今天,我将介绍几种等待后台任务完成的方法以及每种方法的最佳实践。

开始

在开始介绍今天的主角之前,我们需要准备一个示例async方法来模拟IO绑定的方法调用,以及一个自定义的AsyncException,可以用来在测试抛出异常时友好地提示异常信息:

from random import random, randint
import asyncio


class AsyncException(Exception):
    def __init__(self, message, *args, **kwargs):
        self.message = message
        super(*args, **kwargs)

    def __str__(self):
        return self.message


async def some_coro(name):
    print(f"Coroutine {name} begin to run")
    value = random()

    delay = randint(1, 4)
    await asyncio.sleep(delay)
    if value > 0.5:
        raise AsyncException(f"Something bad happen after delay {delay} second(s)")
    print(f"Coro {name} is Done. with delay {delay} second(s)")
    return value
AI 代码解读

并发执行方法比较

1. asyncio.gather

asyncio.gather 可用于启动一组后台任务,等待它们完成执行,并获取结果列表:

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    results = await asyncio.gather(*aws)  # need to unpack the list
    for result in results:
        print(f">got : {result}")

asyncio.run(main())
AI 代码解读

asyncio.gather 虽然组成了一组后台任务,但不能直接接受一个列表或集合作为参数。如果需要传入包含后台任务的列表,请解包。

asyncio.gather 接受一个 return_exceptions 参数。当return_exception的值为False时,任何后台任务抛出异常,都会抛给gather方法的调用者。而 gather 方法的结果列表是空的。

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    try:
        results = await asyncio.gather(*aws, return_exceptions=False)  # need to unpack the list
    except AsyncException as e:
        print(e)
    for result in results:
        print(f">got : {result}")

asyncio.run(main())
AI 代码解读

当return_exception的值为True时,后台任务抛出的异常不会影响其他任务的执行,最终会合并到结果列表中一起返回。

results = await asyncio.gather(*aws, return_exceptions=True)
AI 代码解读

接下来我们看看为什么gather方法不能直接接受一个列表,而是要对列表进行解包。因为当一个列表被填满并执行时,我们很难在等待任务完成时向列表中添加新任务。但是 gather 方法可以使用嵌套组将现有任务与新任务混合,解决了中间无法添加新任务的问题:

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))
    group_1 = asyncio.gather(*aws)  # note we don't use await now
    # when some situation happen, we may add a new task
    group_2 = asyncio.gather(group_1, asyncio.create_task(some_coro("a new task")))
    results = await group_2
    for result in results:
        print(f">got : {result}")

asyncio.run(main())
AI 代码解读

但是gather不能直接设置timeout参数。如果需要为所有正在运行的任务设置超时时间,就用这个姿势,不够优雅。

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    results = await asyncio.wait_for(asyncio.gather(*aws), timeout=2)
    for result in results:
        print(f">got : {result}")

asyncio.run(main())
AI 代码解读

2. asyncio.as_completed

有时,我们必须在完成一个后台任务后立即开始下面的动作。比如我们爬取一些数据,马上调用机器学习模型进行计算,gather方法不能满足我们的需求,但是我们可以使用as_completed方法。

在使用 asyncio.as_completed 方法之前,我们先看一下这个方法的源码。

# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
def as_completed(fs, *, timeout=None):
  # ...
  for f in todo:
      f.add_done_callback(_on_completion)
  if todo and timeout is not None:
      timeout_handle = loop.call_later(timeout, _on_timeout)
  for _ in range(len(todo)):
      yield _wait_for_one()
AI 代码解读

源码显示as_completed不是并发方法,返回一个带有yield语句的迭代器。所以我们可以直接遍历每个完成的后台任务,我们可以对每个任务单独处理异常,而不影响其他任务的执行:

async def main():
    aws = []
    for i in range(5):
        aws.append(asyncio.create_task(some_coro(f"name-{i}")))

    for done in asyncio.as_completed(aws):  # we don't need to unpack the list
        try:
            result = await done
            print(f">got : {result}")
        except AsyncException as e:
            print(e)

asyncio.run(main())
AI 代码解读

as_completed 接受超时参数,超时后当前迭代的任务会抛出asyncio.TimeoutError:

async def main():
    aws = []
    for i in range(5):
        aws.append(asyncio.create_task(some_coro(f"name-{i}")))

    for done in asyncio.as_completed(aws, timeout=2):  # we don't need to unpack the list
        try:
            result = await done
            print(f">got : {result}")
        except AsyncException as e:
            print(e)
        except asyncio.TimeoutError: # we need to handle the TimeoutError
            print("time out.")

asyncio.run(main())
AI 代码解读

as_complete在处理任务执行的结果方面比gather灵活很多,但是在等待的时候很难往原来的任务列表中添加新的任务。

3. asyncio.wait

asyncio.wait 的调用方式与 as_completed 相同,但返回一个包含两个集合的元组:done 和 pending。 done 保存已完成执行的任务,而 pending 保存仍在运行的任务。

asyncio.wait 接受一个 return_when 参数,它可以取三个枚举值:

  • 当return_when为asyncio.ALL_COMPLETED时,done存放所有完成的任务,pending为空。
  • 当 return_when 为 asyncio.FIRST_COMPLETED 时,done 持有所有已完成的任务,而 pending 持有仍在运行的任务。
async def main():
    aws = set()
    for i in range(5):
        aws.add(asyncio.create_task(some_coro(f"name-{i}")))

    done, pending = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        try:
            result = await task
            print(f">got : {result}")
        except AsyncException as e:
            print(e)
    print(f"the length of pending is {len(pending)}")

asyncio.run(main())
AI 代码解读

  • 当return_when为asyncio.FIRST_EXCEPTION时,done存放抛出异常并执行完毕的任务,pending存放仍在运行的任务。

当 return_when 为 asyncio.FIRST_COMPLETED 或 asyncio.FIRST_EXECEPTION 时,我们可以递归调用 asyncio.wait,这样我们就可以添加新的任务,并根据情况一直等待所有任务完成。

async def main():
    pending = set()
    for i in range(5):
        pending.add(asyncio.create_task(some_coro(f"name-{i}")))  # note the type and name of the task list

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in done:
            try:
                result = await task
                print(f">got : {result}")
            except AsyncException as e:
                print(e)
                pending.add(asyncio.create_task(some_coro("a new task")))
    print(f"the length of pending is {len(pending)}")

asyncio.run(main())
AI 代码解读

4. asyncio.TaskGroup

在 Python 3.11 中,asyncio 引入了新的 TaskGroup API,正式让 Python 支持结构化并发。此功能允许您以更 Pythonic 的方式管理并发任务的生命周期。

总结

本文介绍了 asyncio.gather、asyncio.as_completed 和 asyncio.wait API,还回顾了 Python 3.11 中引入的新 asyncio.TaskGroup 特性。

根据实际需要使用这些后台任务管理方式可以让我们的asyncio并发编程更加灵活。

目录
打赏
0
0
0
0
331
分享
相关文章
Python 中调用 DeepSeek-R1 API的方法介绍,图文教程
本教程详细介绍了如何使用 Python 调用 DeepSeek 的 R1 大模型 API,适合编程新手。首先登录 DeepSeek 控制台获取 API Key,安装 Python 和 requests 库后,编写基础调用代码并运行。文末包含常见问题解答和更简单的可视化调用方法,建议收藏备用。 原文链接:[如何使用 Python 调用 DeepSeek-R1 API?](https://apifox.com/apiskills/how-to-call-the-deepseek-r1-api-using-python/)
堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能
本文深入探讨了堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能。文章详细介绍了堆叠的实现步骤,包括数据准备、基础模型训练、新训练集构建及元学习器训练,并讨论了其优缺点。
178 3
随机的暴力美学蒙特卡洛方法 | python小知识
蒙特卡洛方法是一种基于随机采样的计算算法,广泛应用于物理学、金融、工程等领域。它通过重复随机采样来解决复杂问题,尤其适用于难以用解析方法求解的情况。该方法起源于二战期间的曼哈顿计划,由斯坦尼斯拉夫·乌拉姆等人提出。核心思想是通过大量随机样本来近似真实结果,如估算π值的经典示例。蒙特卡洛树搜索(MCTS)是其高级应用,常用于游戏AI和决策优化。Python中可通过简单代码实现蒙特卡洛方法,展示其在文本生成等领域的潜力。随着计算能力提升,蒙特卡洛方法的应用范围不断扩大,成为处理不确定性和复杂系统的重要工具。
72 21
Python3 自定义排序详解:方法与示例
Python的排序功能强大且灵活,主要通过`sorted()`函数和列表的`sort()`方法实现。两者均支持`key`参数自定义排序规则。本文详细介绍了基础排序、按字符串长度或元组元素排序、降序排序、多条件排序及使用`lambda`表达式和`functools.cmp_to_key`进行复杂排序。通过示例展示了如何对简单数据类型、字典、类对象及复杂数据结构(如列车信息)进行排序。掌握这些技巧可以显著提升数据处理能力,为编程提供更强大的支持。
36 10
Python中使用MySQL模糊查询的方法
本文介绍了两种使用Python进行MySQL模糊查询的方法:一是使用`pymysql`库,二是使用`mysql-connector-python`库。通过这两种方法,可以连接MySQL数据库并执行模糊查询。具体步骤包括安装库、配置数据库连接参数、编写SQL查询语句以及处理查询结果。文中详细展示了代码示例,并提供了注意事项,如替换数据库连接信息、正确使用通配符和关闭数据库连接等。确保在实际应用中注意SQL注入风险,使用参数化查询以保障安全性。
用Python实现简单的任务自动化
本文介绍如何使用Python实现任务自动化,提高效率和准确性。通过三个实用案例展示:1. 使用`smtplib`和`schedule`库自动发送邮件提醒;2. 利用`shutil`和`os`库自动备份文件;3. 借助`requests`库自动下载网页内容。每个案例包含详细代码和解释,并附带注意事项。掌握这些技能有助于个人和企业优化流程、节约成本。
73 3
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
133 7
21个Python脚本自动执行日常任务(2)
|
2月前
|
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
68 18
Python-打印99乘法表的两种方法
本文详细介绍了两种实现99乘法表的方法:使用`while`循环和`for`循环。每种方法都包括了步骤解析、代码演示及优缺点分析。文章旨在帮助编程初学者理解和掌握循环结构的应用,内容通俗易懂,适合编程新手阅读。博主表示欢迎读者反馈,共同进步。
构建高效的数据管道:使用Python进行ETL任务
在数据驱动的世界中,高效地处理和移动数据是至关重要的。本文将引导你通过一个实际的Python ETL(提取、转换、加载)项目,从概念到实现。我们将探索如何设计一个灵活且可扩展的数据管道,确保数据的准确性和完整性。无论你是数据工程师、分析师还是任何对数据处理感兴趣的人,这篇文章都将成为你工具箱中的宝贵资源。

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等