Python|玩转 Asyncio 任务处理(2)

简介: Python|玩转 Asyncio 任务处理(2)

引言

Python 的 Asyncio 模块在处理 I/O 密集型任务时表现出色,并且在最近的 Python 版本迭代中获得了诸多增强。不过,由于处理异步任务的途径多样,选择在特定情境下最合适的方法可能会让人感到迷惑。在这篇文章中,我会先从任务对象的基本概念讲起,接着探讨各种处理异步任务的方法,并分析它们各自的优势和劣势。

等待多个任务

现在,让我们来看看有趣的事情 - 等待多个任务!等待任务集合主要有三种方式;每种方法都有其优点和缺点,并且在不同的情况下会有所帮助。

asyncio.wait

我们的第一个选项类似于 wait_for 函数,但它是为一组任务或更为基础的 Future 对象设计的,这些对象可以是列表、元组或集合等形式。

asyncio.wait(collection_of_tasks, *, timeout=None, return_when=ALL_COMPLETED)

此函数返回一个由两个集合组成的元组:第一个集合包含已完成的任务,第二个集合则包含尚未完成的任务。如果在超时期限或 returnwhen 参数指定的条件满足之前任务已完成,它们将被归入已完成的任务集合;未完成的任务则被放入第二个集合,这个集合通常被称作 pending,或者如果你不打算使用这些任务,也可以简单地用下划线 来表示。然而,与 'asyncio.wait' 函数不同的是,在超时发生时,未完成的任务不会被自动取消。

return_when 参数允许你指定 asyncio.wait 函数在以下三种情况之一发生时返回:

  • FIRST_COMPLETED 当第一个任务完成或被取消时返回结果。
  • FIRST_EXCEPTION 当任一任务引发异常,或所有任务都已完成时返回结果。
  • ALL_COMPLETED 是默认选项,它将在所有 futures 完成或被取消时返回结果。

让我们通过一个实际例子来演示这个过程:

import asyncio
import random

async def job():
    await asyncio.sleep(random.randint(1, 5))

async def main():
    tasks = [
        asyncio.create_task(job(), name=index)
        for index in range(1, 5)
    ]

    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    print(f’The first task completed was {
   done.pop().get_name()})

asyncio.run(main())

Output:

The first task completed was 4

asyncio.gather

现在,让我们深入了解 asyncio.gather 函数,特别是带有参数 return_exceptions=False 的用法。

与 wait_for 函数仅接受任务或Futuer对象的集合不同,gather 函数可以接受任意数量的任务、Futuer对象,甚至是协程对象,作为一系列位置参数传递给它。传入 gather 的协程对象会自动转换为任务对象,以便它们能够在事件循环中执行。所有任务完成后,gather 会将所有通过 Task.result() 方法获得的返回值,作为一个列表返回。gather 一个非常贴心的特性是,返回的列表会按照任务传入的顺序排列。

gather 的另一个优点是,它是这三个函数中唯一能够优雅地处理并返回异常的。如果设置了 return_exceptions 参数为 True,那么在任务原本应该返回结果的位置,列表将包含由任务引发的异常。

下面,让我们通过一个实例来具体了解这一机制是如何运作的。

import asyncio
import random

async def job(id):
    print(f’Starting job {
   id})
    await asyncio.sleep(random.randint(1, 3))
    print(f’Finished job {
   id})
    return id

async def main():
    # create a list of worker tasks
    coros = [job(i) for i in range(4)]

    # gather the results of all worker tasks
    results = await asyncio.gather(*coros)

    # print the results
    print(f’Results: {
   results})

asyncio.run(main())

我们首先定义了一个包含多个协程对象的列表,接着通过 * 操作符将这些协程对象作为位置参数展开,供 gather 函数处理。当我们对 gather 函数返回的对象进行等待(即调用 await),它就会开始执行这些任务,并一直运行直至所有任务完成。值得注意的是,尽管由于 await asyncio.sleep(random.randint(1,3)) 的调用导致任务以随机顺序完成,但 gather 返回的结果列表依然保持了我们最初传入任务的顺序。

Starting job 0
Starting job 1
Starting job 2
Starting job 3
Finished job 3
Finished job 0
Finished job 1
Finished job 2
Results: [0, 1, 2, 3]

在下一个示例中,我将两个协程直接放入 Gather 中,并将 return_exceptions 设置为 True,这会在同一结果列表中优雅地返回异常:

import asyncio

async def task1():
    raise ValueError()

async def task2():
    raise KeyError()

async def main():
    results = await asyncio.gather(task1(), task2(), return_exceptions=True)
    print(results)  # Will print [ValueError(), KeyError()]

asyncio.run(main())

asyncio.gather 的最后一个功能是,就像使用 Task.cancel() 取消单个任务一样,gather 返回的对象(然后等待)有自己的 cancel() 方法,该方法将循环遍历所有它正在管理的任务并取消所有这些任务。

asyncio.as_completed

这个函数与前面提到的两个有所不同;它不是一次性提供所有结果的集合或列表,而是提供了一个可迭代的对象,这样你可以在每个结果生成时即时处理它们。这个函数可以处理所有类型的可等待对象,包括协程、任务和未来对象。与其他许多方法类似,它也包含一个用于设置超时的关键字参数,如果到了设定的时间任务还没有完成,就会抛出 TimeoutError 异常。

asyncio.as_completed(aws, *, timeout=None)

以下是 as_completed 工作原理的示例:

import asyncio

async def my_task(id):
    return f’I am number {
   id}async def main():
    tasks = [my_task(id) for id in range(5)]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

我们可以看到输出是随机的,因为它只是打印出先完成的内容:

I am number 3
I am number 0
I am number 4
I am number 2
I am number 1

等待一组任务

在 Python 3.11 中,Asyncio 引入了一个新特性——asyncio.TaskGroup,它以上下文管理器的形式简化了对一组任务的管理。这个特性的一个关键优势在于,如果任务组中的某个任务遇到错误,其他所有任务都会自动取消,这有助于在异步编程中实现更加健壮的错误处理机制。

设想这样一个情形:你有两段代码,每段都负责调用不同的 API 接口。当这两个 API 接口的响应都收集齐后,你打算将这些数据统一存储到数据库中。但如果其中一个 API 调用失败,你希望整个数据插入操作都不要执行。这种情况下,使用 TaskGroup 就非常合适,因为它可以确保两个协程要么都完成,要么在其中一个失败时立即取消另一个。

你可以通过调用 tg.create_task() 方法来向任务组中添加任务。如果任务组中的任何一个任务失败,组内其他所有任务都将被取消。随后,异常会以 ExceptionGroup 或 BaseExceptionGroup 的形式传递到包含任务组的协程中。

以下是一个展示如何使用任务组的示例:

import asyncio

async def do_something():
    return 1

async def do_something_else():
    return 2

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(do_something())
        task2 = tg.create_task(do_something_else())

    print(f’Everything done: {
   task1.result()}, {
   task2.result()})

asyncio.run(main())

Output:

Everything done: 1, 2

总结

我们已经探讨了多种处理可等待对象(awaitables)的方法,现在来回顾一下:

  • await 是最基本的等待操作,你可以将它放在任何可等待对象前面来执行其内部的代码。但 await 不支持直接同时处理多个任务。
  • asyncio.wait_forawait 类似,用于处理单个可等待对象,但它允许设置超时,适用于长时间运行的任务。
  • asyncio.wait 接受一组任务或未来对象,并允许设置超时。你可以根据需求选择返回的时机,例如所有任务完成、第一个任务完成或遇到第一个异常。
  • asyncio.gather 接受多个可等待对象作为位置参数,并返回一个列表,列表中的顺序与传入的参数顺序相同。它还能处理那些抛出异常的任务。
  • asyncio.as_completed 提供了一个可迭代的方式,允许你逐个处理完成的任务,而不是一次性处理所有任务。它同样支持超时参数。
  • asyncio.TaskGroup 是 Python 3.11 新增的特性,它让你可以管理一组任务,并根据是否有任务抛出异常来决定是否全部或一个也不返回结果。
相关文章
|
1天前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
11 4
|
1天前
|
调度 开发者 Python
Python中异步编程的魔法:深入理解asyncio和aiohttp
【9月更文挑战第26天】本文旨在探索Python语言中的异步编程世界,通过深入浅出的方式介绍核心库asyncio和流行的HTTP客户端aiohttp。我们将从基础概念入手,逐步过渡到高级应用,揭示如何在不阻塞主线程的情况下实现高效并发操作。文章不仅提供理论框架,还附带实战代码示例,让读者能够快速掌握并应用到实际项目中。
9 3
|
2天前
|
程序员 API 开发者
探索Python中的异步编程:从asyncio到Trio
在本文中,我们将深入探讨Python的异步编程世界。不同于传统摘要的枯燥介绍,我们将通过一个虚构的故事,讲述一个名叫艾丽的程序员如何在一个周末的编程马拉松中,通过使用Python的asyncio库解决了一个复杂的并发问题,并在最后意外发现了Trio库,从而开启了她对异步编程的新理解。
|
4天前
|
运维 监控 Python
自动化运维:使用Python脚本简化日常任务
【9月更文挑战第23天】在本文中,我们将探索如何通过编写Python脚本来自动化常见的系统管理任务,从而提升效率并减少人为错误。文章将介绍基础的Python编程概念、实用的库函数,以及如何将这些知识应用于创建有用的自动化工具。无论你是新手还是有经验的系统管理员,这篇文章都将为你提供有价值的见解和技巧,帮助你在日常工作中实现自动化。
|
2天前
|
开发框架 并行计算 .NET
燃烧吧,Python!异步编程如何点燃IO密集型任务,让CPU密集型任务也加速狂奔?
燃烧吧,Python!异步编程如何点燃IO密集型任务,让CPU密集型任务也加速狂奔?
8 2
|
3天前
|
运维 监控 Python
自动化运维:使用Python脚本实现日常任务
【9月更文挑战第24天】在现代的软件开发周期中,运维工作扮演着至关重要的角色。本文将介绍如何利用Python编写简单的自动化脚本,来优化和简化日常的运维任务。从备份数据到系统监控,Python的易用性和强大的库支持使其成为自动化运维的首选工具。跟随这篇文章,你将学习如何使用Python编写自己的自动化脚本,提高运维效率,减少人为错误,并最终提升整个开发流程的质量。
|
5天前
|
调度 开发者 Python
探索Python中的异步编程:理解asyncio和协程
【9月更文挑战第22天】在现代软件工程中,异步编程是提升应用性能的关键技术之一。本文将深入探讨Python语言中的异步编程模型,特别是asyncio库的使用和协程的概念。我们将了解如何通过事件循环和任务来处理并发操作,以及如何用协程来编写非阻塞的代码。文章不仅会介绍理论知识,还会通过实际的代码示例展示如何在Python中实现高效的异步操作。
|
5天前
|
设计模式 数据处理 调度
Python中的异步编程:理解并使用Asyncio
【9月更文挑战第22天】在Python中,传统的同步编程模式可能会遇到性能瓶颈,特别是在处理I/O密集型任务时。异步编程提供了一种高效处理并发任务的方法,而asyncio是Python中实现异步编程的库之一。本文将深入介绍asyncio的基本概念、使用方法和实际案例,帮助初学者理解如何在Python中使用异步编程来提升程序的性能和响应性。
11 3
|
3天前
|
算法 Java 程序员
解锁Python高效之道:并发与异步在IO与CPU密集型任务中的精准打击策略!
在数据驱动时代,高效处理大规模数据和高并发请求至关重要。Python凭借其优雅的语法和强大的库支持,成为开发者首选。本文将介绍Python中的并发与异步编程,涵盖并发与异步的基本概念、IO密集型任务的并发策略、CPU密集型任务的并发策略以及异步IO的应用。通过具体示例,展示如何使用`concurrent.futures`、`asyncio`和`multiprocessing`等库提升程序性能,帮助开发者构建高效、可扩展的应用程序。
9 0
|
3天前
|
UED 开发者 Python
Python并发编程新纪元:异步编程如何重塑IO与CPU密集型任务的处理方式?
在Python编程中,异步编程作为一种非阻塞模式,通过允许程序在等待IO操作时继续执行其他任务,提高了程序的响应性和吞吐量。与传统同步编程相比,它减少了线程等待时间,尤其在处理IO密集型任务时表现出色,如使用`asyncio`库进行异步HTTP请求。尽管对CPU密集型任务的直接提升有限,但结合多进程或多线程可间接提高效率。异步编程虽强大,但也带来了代码复杂度增加和调试难度提升等挑战,需要开发者掌握最佳实践来克服这些问题。随着其技术的成熟,异步编程正在逐步改变我们处理IO与CPU密集型任务的方式,成为提升性能和优化用户体验的重要工具。
5 0