Python:从头创建 Asyncio (2)

简介: Python:从头创建 Asyncio (2)

引言

现在,asyncio 已成为 Python 社区中的热门话题,并且名副其实——它提供了一种非常出色的处理 I/O 密集型程序的方法!在我探索 asyncio 的过程中,我起初并不太明白它的工作原理。但随着深入学习,我意识到 asyncio 实际上是在 Python 生成器的基础上增加了一层非常便利的封装。

本文中,我将展示如何仅用 Python 生成器来构建一个 asyncio 的简化模型。接着,我会演示如何利用 await 魔法方法,将示例代码改写为使用 async 和 await 关键字。最终,我会将我的简化版本替换为官方的 asyncio 库。通过这个过程,我相信你将对 asyncio 的神奇之处有一个更深入的理解。

Sleeping

如果我们沿用之前示例中的代码,我们可以通过 yield from 的应用,为我们的任务嵌入子生成器。例如,我在这里引入了一个休眠生成器,它会在指定的时间到达之前暂停任务的执行。这种机制之所以有效,是因为 sleep 函数会连续产生 yield,直到经过了设定的秒数,然后它将跳出 while 循环。

由于 sleep 函数中没有其他 yield 语句,这将引发一个 StopIteration 异常,这个异常告诉 yield from 语句在任务函数中跳过当前的生成器,继续执行下一行代码。

import time

def sleep(seconds):
   start_time = time.time()
   while time.time() - start_time < seconds:
       yield

def task1():
   while True:
       print('Task 1')
       yield from sleep(1)

def task2():
   while True:
       print('Task 2')
       yield from sleep(5)

event_loop = [task1(), task2()]

while True:
   for task in event_loop:
       next(task)

输出:

Task 1
Task 2
Task 1
Task 1
Task 1
Task 1
Task 2
Task 1
…

Yield to Await

我们现在可以将之前的代码示例,通过应用 _await__ 魔术方法和 async 关键字,从使用 yield 转变为使用 await。如果一个类定义了 _await__ 方法,我们就可以在该类的实例前加上 await 关键字来调用这个方法。在 asyncio 框架中,你通常通过调用如 asyncio.create_task 这样的函数来处理 Task 对象。这些 Task 对象是从 asyncio 的 Future 对象派生而来的,而 Future 对象定义了 _await__ 方法。我们还可以在协程前使用 await,协程是在函数定义时加上 async 关键字生成的对象。协程和生成器函数类似,它们的执行都能够被挂起和恢复。

你可以将 await 关键字理解为 yield from 的一个变体,它附带了一些额外的验证规则。因此,当你在代码中写 await object 时,你实际上是在指示从 "object" 类的实例中调用 _await__ 方法,或者 "object" 本身可能就是另一个协程(类似于子生成器)。

实际上,你甚至可以查看 Asyncio 的源代码,发现 Future 对象中的 _await__ 方法在调用时,如果未来(或任务)尚未完成,它基本上只是执行了 yield 操作。

要将我们在上一节中编写的代码转移到使用 async 和 await,我们首先需要创建自己的 Task 类,因为函数不能具有 await dunder 方法。下面是我想出的一个简单版本:

from queue import Queue


event_loop = Queue()

class Task():
    def __init__(self, generator):
        self.iter = generator
        self.finished = False

    def done(self):
        return self.finished

    def __await__(self):
        while not self.finished:
            yield self


def create_task(generator):
    task = Task(generator)
    event_loop.put(task)

    return task

这一次,我们改用队列而非 Python 列表来构建事件循环,这样做更合理,因为我们希望添加或移除任务的操作能够快速完成,即在常数时间内完成。

在我们的 Task 类中,我们将生成器对象保存在 self.iter 属性中,并设置 self.finished 属性为 False,用以跟踪生成器是否已经运行完毕(当生成器引发 StopIteration 异常时,表示其运行结束)。Task 对象还定义了一个 await 魔术方法,这个方法将持续地将控制权交还给事件循环,直到任务完成。完成 Task 对象的创建后,我们使用 create_task 辅助函数将它加入到事件循环中,这将安排它按计划执行。

接下来,我们将构建事件循环管理器,它负责驱动任务的执行。

def run(main):
    event_loop.put(Task(main))

    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)

你或许已经发现,我们的实现开始接近 asyncio 的实际 API。要启动事件循环,我们需要通过一个初始函数来调用 run。这个函数首先将主函数封装进 Task 对象,并加入到事件循环中。随后,while 循环会启动,并且在每次迭代中,通过队列来获取下一个待执行的任务。现在我们使用 task.iter.send(None) 替代了 next(task.iter),这在使用 async/await 关键字时显得有些奇特,但功能上是一致的。我们还需要将这个调用放在 try-except 块中,以便在抛出 StopIteration 异常时,可以将 task.finished 设置为 True;如果没有异常抛出,代码将执行 else 语句,这会把任务重新放回事件循环中,以便再次执行。

接下来,我们需要对 sleep 函数进行异步兼容改造。之前,我们通过一个带有 while 循环和单个 yield 的生成器函数来实现休眠功能。尽管我偏爱这种方法,但 await 关键字不能与生成器函数一起使用——它需要是一个定义了 await 魔术方法的对象或是一个协程函数。因此,为了解决这个问题,我将代码迁移到了另一个函数中,现在实际的 sleep 函数会创建一个任务对象并等待它完成。这个 await 调用将触发 Task 对象内的 await 方法,随后执行 yield,允许事件循环转向其他任务。当事件循环处理到新的 _sleep 任务时,它会检查时间,如果时间未到,同样会执行 yield,将控制权交还给事件循环。如果休眠的任务再次被事件循环调用,就像生成器保存其状态一样,协程仍在等待 sleep 函数返回。由于 sleep 函数还在等待 _sleep 任务完成,任务的 await 魔术方法将再次被调用,由于任务尚未结束,魔术方法中的 yield 将再次被执行。

import time

def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield


async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task

以下是所有代码的汇总:

from queue import Queue
import time


event_loop = Queue()


def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield


async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task


class Task():
    def __init__(self, generator):
        self.iter = generator
        self.finished = False

    def done(self):
        return self.finished

    def __await__(self):
        while not self.finished:
            yield self


def create_task(generator):
    task = Task(generator)
    event_loop.put(task)

    return task


def run(main):
    event_loop.put(Task(main))

    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)

既然我们已经成功构建了事件循环、任务创建机制和 sleep 函数,接下来我们可以引入名为 "jacobio.py" 的文件,并把之前使用 yield 语句的部分替换成 await 调用。同时,我们需要在那些使用了 await 的函数前加上 async 关键字,以表明这些函数是异步的,并且可以被其他代码等待执行。最后,我们还需要像在 asyncio 库中那样编写一个主函数,用于将任务排入事件循环的执行队列中。

import jacobio

async def task1():
    for _ in range(2):
        print('Task 1')
        await jacobio.sleep(1)

async def task2():
    for _ in range(3):
        print('Task 2')
        await jacobio.sleep(0)

async def main():
    one = jacobio.create_task(task1())
    two = jacobio.create_task(task2())

    await one
    await two

    print('done')


if __name__ == '__main__':
    jacobio.run(main())

输出:

Task 1
Task 2
Task 2
Task 2
Task 1
done

Await with AsyncIO

现在,我们可以从上面获取代码,并将所有出现的“jacobio”替换为“asyncio”,我们现在完全使用 asyncio 包!

import asyncio

async def task1():
    for _ in range(2):
        print('Task 1')
        await asyncio.sleep(1)

async def task2():
    for _ in range(3):
        print('Task 2')
        await asyncio.sleep(0)

async def main():
    one = asyncio.create_task(task1())
    two = asyncio.create_task(task2())

    await one
    await two

    print('done')


if __name__ == '__main__':
    asyncio.run(main())

Asyncio 在后台执行了许多复杂的操作,但我们成功地从基础的生成器出发,一步步重建了 asyncio 的核心功能!我努力使事件循环管理器的设计尽可能简洁,尽管这仅是 asyncio 工作理念的简化版,与实际的库相比,我的实现在细节上与官方源代码的执行流程有所不同。此外,既然我们现在拥有了完整的 asyncio 库的功能,就无需为了同时等待两个任务而分别创建它们;我们完全可以使用 asyncio.gather() 这样的函数来同时管理多个任务。

相关文章
|
2月前
|
Python
Python中的异步编程:理解并使用asyncio和aiohttp
【8月更文挑战第24天】在Python中,异步编程是一个强大的工具,它可以帮助我们编写出高性能的网络应用。本文将介绍Python的异步编程库asyncio和aiohttp,并通过示例代码展示如何使用它们来创建一个简单的HTTP服务器。我们将看到,通过使用这些库,我们可以在不阻塞主线程的情况下处理大量的并发请求。
|
3天前
|
数据处理 开发者 Python
浅析Python中的异步编程:从asyncio到Tornado
Python的异步编程是提升应用性能的关键。本文从Python的异步编程概念入手,探讨了asyncio库的使用及其在实际开发中的应用,并分析了Tornado框架的异步模型,以及如何将异步思维运用于实际项目中。
|
6天前
|
调度 开发者 Python
Python中异步编程的魔法:深入理解asyncio和aiohttp
【9月更文挑战第26天】本文旨在探索Python语言中的异步编程世界,通过深入浅出的方式介绍核心库asyncio和流行的HTTP客户端aiohttp。我们将从基础概念入手,逐步过渡到高级应用,揭示如何在不阻塞主线程的情况下实现高效并发操作。文章不仅提供理论框架,还附带实战代码示例,让读者能够快速掌握并应用到实际项目中。
14 3
|
7天前
|
程序员 API 开发者
探索Python中的异步编程:从asyncio到Trio
在本文中,我们将深入探讨Python的异步编程世界。不同于传统摘要的枯燥介绍,我们将通过一个虚构的故事,讲述一个名叫艾丽的程序员如何在一个周末的编程马拉松中,通过使用Python的asyncio库解决了一个复杂的并发问题,并在最后意外发现了Trio库,从而开启了她对异步编程的新理解。
|
10天前
|
调度 开发者 Python
探索Python中的异步编程:理解asyncio和协程
【9月更文挑战第22天】在现代软件工程中,异步编程是提升应用性能的关键技术之一。本文将深入探讨Python语言中的异步编程模型,特别是asyncio库的使用和协程的概念。我们将了解如何通过事件循环和任务来处理并发操作,以及如何用协程来编写非阻塞的代码。文章不仅会介绍理论知识,还会通过实际的代码示例展示如何在Python中实现高效的异步操作。
|
10天前
|
设计模式 数据处理 调度
Python中的异步编程:理解并使用Asyncio
【9月更文挑战第22天】在Python中,传统的同步编程模式可能会遇到性能瓶颈,特别是在处理I/O密集型任务时。异步编程提供了一种高效处理并发任务的方法,而asyncio是Python中实现异步编程的库之一。本文将深入介绍asyncio的基本概念、使用方法和实际案例,帮助初学者理解如何在Python中使用异步编程来提升程序的性能和响应性。
13 3
|
14天前
|
开发者 Python
探索Python中的异步编程:理解Asyncio和协程
【9月更文挑战第18天】在Python的世界中,异步编程是一个强大而神秘的概念。它像是一把双刃剑,掌握得好可以大幅提升程序的效率和性能;使用不当则可能让代码变得难以维护和理解。本文将带你一探究竟,通过深入浅出的方式介绍Python中asyncio库和协程的基本概念、使用方法及其背后的原理,让你对异步编程有一个全新的认识。
|
2月前
|
数据采集 调度 开发者
Python并发编程:异步编程(asyncio模块)
本文详细介绍了 Python 的 asyncio 模块,包括其基础概念、核心组件、常用功能等,并通过多个示例展示了如何在实际项目中应用这些技术。通过学习这些内容,您应该能够熟练掌握 Python 中的异步编程,提高编写并发程序的能力。 异步编程可以显著提高程序的响应速度和并发处理能力,但也带来了新的挑战和问题。在使用 asyncio 时,需要注意合理设计协程和任务,避免阻塞操作,并充分利用事件循环和异步 I/O 操作。
|
2月前
|
调度 数据库 UED
Python使用asyncio包实现异步编程方式
异步编程是一种编程范式,用于处理程序中需要等待异步操作完成后才能继续执行的情况。 异步编程允许程序在执行耗时的操作时不被阻塞,而是在等待操作完成时继续执行其他任务。 这对于处理诸如文件 I/O、网络请求、定时器等需要等待的操作非常有用。
|
2月前
|
数据采集 设计模式 数据处理
探索Python中的异步编程:使用asyncio和aiohttp构建高性能Web爬虫
【8月更文挑战第27天】在数字时代的浪潮中,数据抓取技术成为获取网络信息的重要手段。本文将引导读者步入Python异步编程的殿堂,详细探讨如何使用asyncio库和aiohttp模块来构建一个高性能的Web爬虫。文章不仅提供理论知识,还通过实际代码示例,展示如何实现非阻塞I/O操作,从而显著提高程序执行效率,让数据处理变得更加迅速和高效。
下一篇
无影云桌面