完整了解如何在python中处理协程和流

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
可观测监控 Prometheus 版,每月50GB免费额度
简介: 【6月更文挑战第25天】本文介绍异步库asyncio的概念和用法,异步编程在Python中是通过事件循环和协程实现并发,随着版本更新,API有所变化。

简介

本文介绍asyncio中常用概念和函数。

def`定义的函数是协程函数,调用产生协程对象。

  • asyncio库支持协程和任务,包括基于生成器的旧式协程。
  • 可等待对象:如协程、任务(asyncio.create_task())、Future,async/await用于控制流。
  • asyncio.run()运行协程,管理事件循环,仅用于主程序入口。
  • 任务:asyncio.create_task()创建和调度协程,asyncio.ensure_future()(<3.7)用于旧版。
  • asyncio.sleep()用于挂起任务,允许其他任务执行。
  • asyncio.gather()并发运行多个任务,返回结果列表或处理异常。
  • asyncio.shield()保护协程免受取消影响,3.10版本已移除。
  • asyncio.wait_for()超时控制,超时则引发TimeoutError
  • asyncio.wait()等待一组任务,返回完成和未完成的任务集。
  • as_completed()返回最早完成的任务,适合并发处理。
  • asyncio.to_thread()在新线程中运行函数,避免阻塞事件循环。
  • asyncio.run_coroutine_threadsafe()线程安全提交协程。
  • asyncio.current_task()all_tasks()用于检查当前或所有任务。
  • 取消任务:调用task.cancel(),捕获CancelledError
  • Stream函数如open_connection()start_server()用于网络流操作。
  • Unix套接字函数提供Unix域套接字的支持。

1 队列处理

在本文档中 "协程" 可用来表示两个紧密关联的概念:

协程函数: 定义形式为 async def 的函数;

协程对象: 调用 协程函数 所返回的对象。

asyncio 也支持旧式的 基于生成器的 协程。

  • 首先看一个:线程添加和取出

    from queue import Queue
    >>> a = Queue()
    

生成者进程

  >>> Thread(target=producer, args=(a, 10)).start()  

消费者进程

  >>> Thread(target=consumer, args=(a,)).start()   

当执行任务较多的时候,使用异步可以提前返回一些计算内容,其他执行时间较长的任务可以在其他进程继续执行。

2 asyncio 异步io库 概念

可等待对象

  • 协程 async/await
  • 任务 task
    asyncio.create_task() #封装协程为一个任务 该协程会被自动调度执行

  • Future

Future是一种特殊的低层级 可等待对象,表示一个异步操作的最终结果。

当一个Future对象被等待,意味着协程将保持直到该Future对象 在其他地方完成操作。

在asyncio 需要Future对象以便允许通过async/await 使用基于回调代码。

通常没有必要在应用层 创建 Future对象。

Future对象有时会 由库和某些asyncio API暴露给用户 用作可等对象。

3 协程与任务

https://docs.python.org/zh-cn/3/library/asyncio-task.html#coroutine
  • asyncio.run 运行 asyncio程序

执行 coroutine coro 并返回结果。

此函数 将运行传入的 协程,负责管理asyncio事件循环。
终结异步 生成器,并关闭线程池。

当有其他 asyncio 事件循环 在同一线程时,函数不能被调用
debug为True时,事件循环将以调式模式运行

此函数 总是创建一个新的事件循环并在结束时关闭。
它应该被当作asyncio程序主入口,理想情况只被调用一次。

  • task创建任务 asyncio.create_task(coro, *, name=None)

将coro协程封装为 Task并调度其执行,返回Task对象。

name不为None时,它将使用Task.set_name() 设为任务名称。

该任务在 get_running_loop() 返回的循环中执行。

如果当前线程没有在运行的循环则引发RuntimeError。

  • 3.7 以上的才支持,<3.7的版本需要 底层的 asyncio.ensure_future()函数。
   task = asyncio.create_task(coro())

等效

                     task = asyncio.ensure_future(coro())

休眠

        coroutine asyncio.sleep(delay, result=None)

阻塞 delay 指定秒数

如果指定了 result,则当协程完成时将其返回给调用者。
sleep() 总是会挂起当前任务,以允许其他任务运行。

将delay 设为0 将提供一个 经优化的路径,以运行其他任务运行。
这可供长期运行的函数使用,以避免在函数调用全过程中阻塞事件 循环。

并行运行任务

        awaitable asyncio.gather(*aws, return_exception=False)

并发运行 aws序列中的可等待对象。

如果aws的某个可等待对象为协程,它将自动被作为一个任务的调度。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合的列表,顺序与 aws可等待对象一致。

如果return_exceptions 为Fasle,所引发的异常将立即传播给等待gather()任务。

aws序列中其他可等待对象 不会被取消并继续运行。

如果aws 序列中的任一task 或 Future对象被取消,它将被引发CancelledError 一样处理。

在此情况下gather调用 不会被取消。

这是为了防止一个已提交的 Task/Future被取消导致其他Tasks/Future也被取消

如果 return_exception=False 则在gather()被标记已完成时,取消它不会取消 任何已提交的可等待对象。

如: 在一个异常传播给调用者之后,gather可标记为已完成,

因此, 在从gather捕获一个(由可等待对象所引发的)异常之后调用gather.cancel() 将不会 取消任何其他可等待对象。

  • 3.10 版本 已取消该参数 return_exception。

如果未提供位置参数或者并非所有位置参数均为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。

屏蔽取消操作

        asyncio.shield(aw)

保护一个可等待对象 防止其被取消,如果aw是一个协程,它将自动被作为任务调度。

            res = await shield(something())

相当于

            res = await something()

不同之处在于如果 保护它的协程被取消,在something() 运行中的任务不会被取消。从something()的角度看来。

取消操作并没有发生,然而其调用这已被取消,因此,await表达式仍然会引发 CancelledError。

如果 通过其他方式 取消something() 如内部操作,shield也将被取消。

如果希望 完全忽略 取消操作,不推荐,则shield() 函数需要一个 try/except代码段

            try:
                res = await shiedl(something())
            except CancelledError:
                res = None
  • 3.10 已删除,如果await 不是Future 类对象,并且没有正在运行的事件循环,将发生弃用警告。

超时

        coroutine asyncio.wait_for(aw, timeout)

等待 aw 可等待对象完成,指定 timeout 秒数后超时。

如果aw是一个协程,自动被作为任务调度。

timeout可以为None,也可以为 float 或 int 型整数表示的等待秒数。

如果为None,则必须到完成为止。
如果超时,任务将取消并引发 asyncio.TimeoutError。

要避免任务 取消,可以假设 shield()。

  • 简单等待

          coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
    

并发运行aws 可迭代对象中的可等待对象 并进入阻塞直到满足 return_when所指定条件。

aws 可迭代对象不可为空

返回 Task/Future集合 (done, pending)

此函数不会引发 asyncio.TimeoutError 当超时发生时,未完成的Future 或 Task 将在指定秒数后被返回。

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

wait() 会自动以任务的形式调度协程,之后将以 (done, pending) 集合形式返回显式创建的任务对象。

正确写法

   async def foo():
    return 42


    task = asyncio.create_task(foo())
    done, pending = await asyncio.wait({task})

if task in done:
    print("done")

完成处理

 as_completed(aws,*,timeout=None)

并发地运行aws 可迭代对象的可等待对象,返回一个协程迭代器。

返回的每个协程 可被等待以从剩余可等待对象的 可迭代对象中获取最早的结果。

如果所有Future对象完成前发生超时则引发asyncio.TimeoutError。

        for  coro in as_completed(aws):
             earliest_result = await coro

在线程运行

        coroutine asyncio.to_thread(func, /, *args, **kwargs)

不同的线程中异步运行函数 func。

向此函数提供的任何 args, * kwargs 会被直接传给func。

并且,当前 contextvars.Context将被传播,允许在不同线程访问来自事件循环的上下文变量。

返回一个可等待以获取 func 的最终结果的协程,主要用于执行在其他情况下会阻塞事件循环的IO密集型函数 方法。

  • 跨线程调度

          asyncio.run_coroutine_threadsafe(coro, loop)
    

向指定事件循环 提交一个线程。 线程安全。

返回一个 concurrent.futures.Future以等待来自其他OS线程的结果。

此函数应该从另一个OS线程调用,非事件循环运行所在线程。

  • 内省
    返回当前运行的Task实例,如果没有正在运行的任务则返回None。
    如果loop未None则使用 get_running_loop()获取当前事件循环。

    asyncio.current_task(loop=None)

返回事件循环所在运行的未完成Task对象集合。
如果loop 为Nong,则使用get_running_loop()获取当前事件。

           asyncio.all_tasks(loop=None)
  • Task对象
            asyncio.Task()
    

一个与Future类似 的对象,可运行python协程,非线程安全

  • 取消task

     async def cancel_me():
         print('cancel_me(): before sleep')
    
     try:
         # Wait for 1 hour
         await asyncio.sleep(5)
     except asyncio.CancelledError:
         print('cancel_me(): cancel sleep')
         raise
     finally:
         print('cancel_me(): after sleep')
    
     async def main9():
         # Create a "cancel_me" Task
         task = asyncio.create_task(cancel_me())
    
         # Wait for 1 second
         await asyncio.sleep(1)
    
         task.cancel()
         try:
             await task
         except asyncio.CancelledError:
             print("main(): cancel_me is cancelled now")
         finally:
             print(task.cancelled())
    
  • 基于生成器 的协程

    基于生成器的协程是 async/await语法前身,它们是使用 yuield from 创建的

    使用 @asyncio.coroutine装饰

4 流

  • Stream 函数

    下面的高级 asyncio 函数可以用来创建和处理流:

     coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)
    

建立网络连接并返回一对 (reader, writer) 对象。

返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例。

    coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

启动套接字服务。
当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。

该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

  • Unix 套接字

     coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
    

    建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。

与 open_connection() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_connection().

在 3.10 版更改: Removed the loop parameter.

    coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

启动一个 Unix 套接字服务。

StreamReader
    class asyncio.StreamReader

这个类表示一个读取器对象,该对象提供api以便于从IO流中读取数据。

不推荐直接实例化 StreamReader 对象,建议使用 open_connection() 和 start_server() 来获取 StreamReader 实例。

    coroutine read(n=- 1)

至多读取 n 个byte。 如果没有设置 n , 则自动置为 -1 , -1时表示读至 EOF 并返回所有读取的byte。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

    coroutine readline()

读取一行,其中“行”指的是以 \n 结尾的字节序列。

如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

StreamWriter
    class asyncio.StreamWriter

这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。

不建议直接实例化 StreamWriter;而应改用 open_connection() 和 start_server()。

    write(data)

此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

    stream.write(data)
    await stream.drain()
    writelines(data)

此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。

如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

    stream.writelines(lines)
    await stream.drain()
    close()

此方法会关闭流以及下层的套接字。

此方法应与 wait_closed() 方法一起使用:

    stream.close()
    await stream.wait_closed()

注册一个打开的套接字以等待使用流的数据。

使用低层级协议以及 loop.create_connection() 方法的 注册一个打开的套接字以等待使用协议的数据 示例。

    https://docs.python.org/zh-cn/3/library/asyncio-protocol.html#asyncio-example-create-connection

使用低层级的 loop.add_reader() 方法来监视文件描述符的 监视文件描述符以读取事件 示例。

使用 open_connection() 函数实现等待直到套接字接收到数据的协程:

    import asyncio
    import socket

    async def wait_for_data():
        # Get a reference to the current event loop because
        # we want to access low-level APIs.
        loop = asyncio.get_running_loop()

        # Create a pair of connected sockets.
        rsock, wsock = socket.socketpair()

        # Register the open socket to wait for data.
        reader, writer = await asyncio.open_connection(sock=rsock)

        # Simulate the reception of data from the network
        loop.call_soon(wsock.send, 'abc'.encode())

        # Wait for data
        data = await reader.read(100)

        # Got data, we are done: close the socket
        print("Received:", data.decode())
        writer.close()

        # Close the second socket
        wsock.close()

5 小结

本节介绍python的异步支持,异步包引入较晚,因此有部分功能仍在变动中。

比如在3.10支持PEGs 规范的解析器之后,取消了loop :Removed the loop parameter. 并且在await如果对象不可等待时,将返回错误。

目录
相关文章
|
26天前
|
调度 Python
python知识点100篇系列(20)-python协程与异步编程asyncio
【10月更文挑战第8天】协程(Coroutine)是一种用户态内的上下文切换技术,通过单线程实现代码块间的切换执行。Python中实现协程的方法包括yield、asyncio模块及async/await关键字。其中,async/await结合asyncio模块可更便捷地编写和管理协程,支持异步IO操作,提高程序并发性能。协程函数、协程对象、Task对象等是其核心概念。
|
16天前
|
NoSQL 关系型数据库 MySQL
python协程+异步总结!
本文介绍了Python中的协程、asyncio模块以及异步编程的相关知识。首先解释了协程的概念和实现方法,包括greenlet、yield关键字、asyncio装饰器和async/await关键字。接着详细讲解了协程的意义和应用场景,如提高IO密集型任务的性能。文章还介绍了事件循环、Task对象、Future对象等核心概念,并提供了多个实战案例,包括异步Redis、MySQL操作、FastAPI框架和异步爬虫。最后提到了uvloop作为asyncio的高性能替代方案。通过这些内容,读者可以全面了解和掌握Python中的异步编程技术。
34 0
|
16天前
|
数据采集 缓存 程序员
python协程使用教程
1. **协程**:介绍了协程的概念、与子程序的区别、优缺点,以及如何在 Python 中使用协程。 2. **同步与异步**:解释了同步与异步的概念,通过示例代码展示了同步和异步处理的区别和应用场景。 3. **asyncio 模块**:详细介绍了 asyncio 模块的概述、基本使用、多任务处理、Task 概念及用法、协程嵌套与返回值等。 4. **aiohttp 与 aiofiles**:讲解了 aiohttp 模块的安装与使用,包括客户端和服务器端的简单实例、URL 参数传递、响应内容读取、自定义请求等。同时介绍了 aiofiles 模块的安装与使用,包括文件读写和异步迭代
18 0
|
1月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
深入探索:Python中的并发编程新纪元——协程与异步函数解析
26 3
|
2月前
|
Python
Python中的异步编程与协程实践
【9月更文挑战第28天】本文旨在通过一个简单易懂的示例,介绍如何在Python中利用asyncio库实现异步编程和协程。我们将通过代码示例来展示如何编写高效的并发程序,并解释背后的原理。
|
2月前
|
数据库 开发者 Python
实战指南:用Python协程与异步函数优化高性能Web应用
在快速发展的Web开发领域,高性能与高效响应是衡量应用质量的重要标准。随着Python在Web开发中的广泛应用,如何利用Python的协程(Coroutine)与异步函数(Async Functions)特性来优化Web应用的性能,成为了许多开发者关注的焦点。本文将从实战角度出发,通过具体案例展示如何运用这些技术来提升Web应用的响应速度和吞吐量。
29 1
|
2月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
在Python异步编程领域,协程与异步函数成为处理并发任务的关键工具。协程(微线程)比操作系统线程更轻量级,通过`async def`定义并在遇到`await`表达式时暂停执行。异步函数利用`await`实现任务间的切换。事件循环作为异步编程的核心,负责调度任务;`asyncio`库提供了事件循环的管理。Future对象则优雅地处理异步结果。掌握这些概念,可使代码更高效、简洁且易于维护。
26 1
|
2月前
|
调度 开发者 Python
探索Python中的异步编程:理解asyncio和协程
【9月更文挑战第22天】在现代软件工程中,异步编程是提升应用性能的关键技术之一。本文将深入探讨Python语言中的异步编程模型,特别是asyncio库的使用和协程的概念。我们将了解如何通过事件循环和任务来处理并发操作,以及如何用协程来编写非阻塞的代码。文章不仅会介绍理论知识,还会通过实际的代码示例展示如何在Python中实现高效的异步操作。
|
1月前
|
数据采集 调度 Python
Python编程异步爬虫——协程的基本原理(一)
Python编程异步爬虫——协程的基本原理(一)
|
1月前
|
数据采集 Python
Python编程异步爬虫——协程的基本原理(二)
Python编程异步爬虫——协程的基本原理(二)