完整了解如何在python中处理协程和流

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【6月更文挑战第25天】本文介绍异步库asyncio的概念和用法,异步编程在Python中是通过事件循环和协程实现并发,随着版本更新,API有所变化。

简介

本文介绍asyncio中常用概念和函数。

def`定义的函数是协程函数,调用产生协程对象。

  • asyncio库支持协程和任务,包括基于生成器的旧式协程。
  • 可等待对象:如协程、任务(asyncio.create_task())、Future,async/await用于控制流。
  • asyncio.run()运行协程,管理事件循环,仅用于主程序入口。
  • 任务:asyncio.create_task()创建和调度协程,asyncio.ensure_future()(<3.7)用于旧版。
  • asyncio.sleep()用于挂起任务,允许其他任务执行。
  • asyncio.gather()并发运行多个任务,返回结果列表或处理异常。
  • asyncio.shield()保护协程免受取消影响,3.10版本已移除。
  • asyncio.wait_for()超时控制,超时则引发TimeoutError
  • asyncio.wait()等待一组任务,返回完成和未完成的任务集。
  • as_completed()返回最早完成的任务,适合并发处理。
  • asyncio.to_thread()在新线程中运行函数,避免阻塞事件循环。
  • asyncio.run_coroutine_threadsafe()线程安全提交协程。
  • asyncio.current_task()all_tasks()用于检查当前或所有任务。
  • 取消任务:调用task.cancel(),捕获CancelledError
  • Stream函数如open_connection()start_server()用于网络流操作。
  • Unix套接字函数提供Unix域套接字的支持。

1 队列处理

在本文档中 "协程" 可用来表示两个紧密关联的概念:

协程函数: 定义形式为 async def 的函数;

协程对象: 调用 协程函数 所返回的对象。

asyncio 也支持旧式的 基于生成器的 协程。

  • 首先看一个:线程添加和取出

    from queue import Queue
    >>> a = Queue()
    

生成者进程

  >>> Thread(target=producer, args=(a, 10)).start()  

消费者进程

  >>> Thread(target=consumer, args=(a,)).start()   

当执行任务较多的时候,使用异步可以提前返回一些计算内容,其他执行时间较长的任务可以在其他进程继续执行。

2 asyncio 异步io库 概念

可等待对象

  • 协程 async/await
  • 任务 task
    asyncio.create_task() #封装协程为一个任务 该协程会被自动调度执行

  • Future

Future是一种特殊的低层级 可等待对象,表示一个异步操作的最终结果。

当一个Future对象被等待,意味着协程将保持直到该Future对象 在其他地方完成操作。

在asyncio 需要Future对象以便允许通过async/await 使用基于回调代码。

通常没有必要在应用层 创建 Future对象。

Future对象有时会 由库和某些asyncio API暴露给用户 用作可等对象。

3 协程与任务

https://docs.python.org/zh-cn/3/library/asyncio-task.html#coroutine
  • asyncio.run 运行 asyncio程序

执行 coroutine coro 并返回结果。

此函数 将运行传入的 协程,负责管理asyncio事件循环。
终结异步 生成器,并关闭线程池。

当有其他 asyncio 事件循环 在同一线程时,函数不能被调用
debug为True时,事件循环将以调式模式运行

此函数 总是创建一个新的事件循环并在结束时关闭。
它应该被当作asyncio程序主入口,理想情况只被调用一次。

  • task创建任务 asyncio.create_task(coro, *, name=None)

将coro协程封装为 Task并调度其执行,返回Task对象。

name不为None时,它将使用Task.set_name() 设为任务名称。

该任务在 get_running_loop() 返回的循环中执行。

如果当前线程没有在运行的循环则引发RuntimeError。

  • 3.7 以上的才支持,<3.7的版本需要 底层的 asyncio.ensure_future()函数。
   task = asyncio.create_task(coro())

等效

                     task = asyncio.ensure_future(coro())

休眠

        coroutine asyncio.sleep(delay, result=None)

阻塞 delay 指定秒数

如果指定了 result,则当协程完成时将其返回给调用者。
sleep() 总是会挂起当前任务,以允许其他任务运行。

将delay 设为0 将提供一个 经优化的路径,以运行其他任务运行。
这可供长期运行的函数使用,以避免在函数调用全过程中阻塞事件 循环。

并行运行任务

        awaitable asyncio.gather(*aws, return_exception=False)

并发运行 aws序列中的可等待对象。

如果aws的某个可等待对象为协程,它将自动被作为一个任务的调度。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合的列表,顺序与 aws可等待对象一致。

如果return_exceptions 为Fasle,所引发的异常将立即传播给等待gather()任务。

aws序列中其他可等待对象 不会被取消并继续运行。

如果aws 序列中的任一task 或 Future对象被取消,它将被引发CancelledError 一样处理。

在此情况下gather调用 不会被取消。

这是为了防止一个已提交的 Task/Future被取消导致其他Tasks/Future也被取消

如果 return_exception=False 则在gather()被标记已完成时,取消它不会取消 任何已提交的可等待对象。

如: 在一个异常传播给调用者之后,gather可标记为已完成,

因此, 在从gather捕获一个(由可等待对象所引发的)异常之后调用gather.cancel() 将不会 取消任何其他可等待对象。

  • 3.10 版本 已取消该参数 return_exception。

如果未提供位置参数或者并非所有位置参数均为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。

屏蔽取消操作

        asyncio.shield(aw)

保护一个可等待对象 防止其被取消,如果aw是一个协程,它将自动被作为任务调度。

            res = await shield(something())

相当于

            res = await something()

不同之处在于如果 保护它的协程被取消,在something() 运行中的任务不会被取消。从something()的角度看来。

取消操作并没有发生,然而其调用这已被取消,因此,await表达式仍然会引发 CancelledError。

如果 通过其他方式 取消something() 如内部操作,shield也将被取消。

如果希望 完全忽略 取消操作,不推荐,则shield() 函数需要一个 try/except代码段

            try:
                res = await shiedl(something())
            except CancelledError:
                res = None
  • 3.10 已删除,如果await 不是Future 类对象,并且没有正在运行的事件循环,将发生弃用警告。

超时

        coroutine asyncio.wait_for(aw, timeout)

等待 aw 可等待对象完成,指定 timeout 秒数后超时。

如果aw是一个协程,自动被作为任务调度。

timeout可以为None,也可以为 float 或 int 型整数表示的等待秒数。

如果为None,则必须到完成为止。
如果超时,任务将取消并引发 asyncio.TimeoutError。

要避免任务 取消,可以假设 shield()。

  • 简单等待

          coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
    

并发运行aws 可迭代对象中的可等待对象 并进入阻塞直到满足 return_when所指定条件。

aws 可迭代对象不可为空

返回 Task/Future集合 (done, pending)

此函数不会引发 asyncio.TimeoutError 当超时发生时,未完成的Future 或 Task 将在指定秒数后被返回。

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

wait() 会自动以任务的形式调度协程,之后将以 (done, pending) 集合形式返回显式创建的任务对象。

正确写法

   async def foo():
    return 42


    task = asyncio.create_task(foo())
    done, pending = await asyncio.wait({task})

if task in done:
    print("done")

完成处理

 as_completed(aws,*,timeout=None)

并发地运行aws 可迭代对象的可等待对象,返回一个协程迭代器。

返回的每个协程 可被等待以从剩余可等待对象的 可迭代对象中获取最早的结果。

如果所有Future对象完成前发生超时则引发asyncio.TimeoutError。

        for  coro in as_completed(aws):
             earliest_result = await coro

在线程运行

        coroutine asyncio.to_thread(func, /, *args, **kwargs)

不同的线程中异步运行函数 func。

向此函数提供的任何 args, * kwargs 会被直接传给func。

并且,当前 contextvars.Context将被传播,允许在不同线程访问来自事件循环的上下文变量。

返回一个可等待以获取 func 的最终结果的协程,主要用于执行在其他情况下会阻塞事件循环的IO密集型函数 方法。

  • 跨线程调度

          asyncio.run_coroutine_threadsafe(coro, loop)
    

向指定事件循环 提交一个线程。 线程安全。

返回一个 concurrent.futures.Future以等待来自其他OS线程的结果。

此函数应该从另一个OS线程调用,非事件循环运行所在线程。

  • 内省
    返回当前运行的Task实例,如果没有正在运行的任务则返回None。
    如果loop未None则使用 get_running_loop()获取当前事件循环。

    asyncio.current_task(loop=None)

返回事件循环所在运行的未完成Task对象集合。
如果loop 为Nong,则使用get_running_loop()获取当前事件。

           asyncio.all_tasks(loop=None)
  • Task对象
            asyncio.Task()
    

一个与Future类似 的对象,可运行python协程,非线程安全

  • 取消task

     async def cancel_me():
         print('cancel_me(): before sleep')
    
     try:
         # Wait for 1 hour
         await asyncio.sleep(5)
     except asyncio.CancelledError:
         print('cancel_me(): cancel sleep')
         raise
     finally:
         print('cancel_me(): after sleep')
    
     async def main9():
         # Create a "cancel_me" Task
         task = asyncio.create_task(cancel_me())
    
         # Wait for 1 second
         await asyncio.sleep(1)
    
         task.cancel()
         try:
             await task
         except asyncio.CancelledError:
             print("main(): cancel_me is cancelled now")
         finally:
             print(task.cancelled())
    
  • 基于生成器 的协程

    基于生成器的协程是 async/await语法前身,它们是使用 yuield from 创建的

    使用 @asyncio.coroutine装饰

4 流

  • Stream 函数

    下面的高级 asyncio 函数可以用来创建和处理流:

     coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)
    

建立网络连接并返回一对 (reader, writer) 对象。

返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例。

    coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

启动套接字服务。
当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。

该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

  • Unix 套接字

     coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
    

    建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。

与 open_connection() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_connection().

在 3.10 版更改: Removed the loop parameter.

    coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

启动一个 Unix 套接字服务。

StreamReader
    class asyncio.StreamReader

这个类表示一个读取器对象,该对象提供api以便于从IO流中读取数据。

不推荐直接实例化 StreamReader 对象,建议使用 open_connection() 和 start_server() 来获取 StreamReader 实例。

    coroutine read(n=- 1)

至多读取 n 个byte。 如果没有设置 n , 则自动置为 -1 , -1时表示读至 EOF 并返回所有读取的byte。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

    coroutine readline()

读取一行,其中“行”指的是以 \n 结尾的字节序列。

如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

StreamWriter
    class asyncio.StreamWriter

这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。

不建议直接实例化 StreamWriter;而应改用 open_connection() 和 start_server()。

    write(data)

此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

    stream.write(data)
    await stream.drain()
    writelines(data)

此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。

如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

    stream.writelines(lines)
    await stream.drain()
    close()

此方法会关闭流以及下层的套接字。

此方法应与 wait_closed() 方法一起使用:

    stream.close()
    await stream.wait_closed()

注册一个打开的套接字以等待使用流的数据。

使用低层级协议以及 loop.create_connection() 方法的 注册一个打开的套接字以等待使用协议的数据 示例。

    https://docs.python.org/zh-cn/3/library/asyncio-protocol.html#asyncio-example-create-connection

使用低层级的 loop.add_reader() 方法来监视文件描述符的 监视文件描述符以读取事件 示例。

使用 open_connection() 函数实现等待直到套接字接收到数据的协程:

    import asyncio
    import socket

    async def wait_for_data():
        # Get a reference to the current event loop because
        # we want to access low-level APIs.
        loop = asyncio.get_running_loop()

        # Create a pair of connected sockets.
        rsock, wsock = socket.socketpair()

        # Register the open socket to wait for data.
        reader, writer = await asyncio.open_connection(sock=rsock)

        # Simulate the reception of data from the network
        loop.call_soon(wsock.send, 'abc'.encode())

        # Wait for data
        data = await reader.read(100)

        # Got data, we are done: close the socket
        print("Received:", data.decode())
        writer.close()

        # Close the second socket
        wsock.close()

5 小结

本节介绍python的异步支持,异步包引入较晚,因此有部分功能仍在变动中。

比如在3.10支持PEGs 规范的解析器之后,取消了loop :Removed the loop parameter. 并且在await如果对象不可等待时,将返回错误。

目录
相关文章
|
1月前
|
安全 调度 Python
探索Python中的并发编程:协程与多线程的比较
本文将深入探讨Python中的并发编程技术,重点比较协程与多线程的特点和应用场景。通过对协程和多线程的原理解析,以及在实际项目中的应用案例分析,读者将能够更好地理解两种并发编程模型的异同,并在实践中选择合适的方案来提升Python程序的性能和效率。
|
5天前
|
数据挖掘 程序员 调度
Python并发编程之协程与异步IO
传统的多线程和多进程模型在处理大规模并发时存在一些性能瓶颈和资源消耗问题。本文将重点介绍Python中基于协程和异步IO的并发编程方法,探讨其工作原理和实际应用,帮助开发者更好地理解并利用Python的并发编程能力。
|
6天前
|
开发者 Python
探索 Python 中的协程:从基本概念到实际应用
在现代编程中,异步处理变得越来越重要,Python 通过其内置的协程提供了强大的工具来简化这一过程。本文将深入探讨 Python 中的协程,从基本概念出发,逐步展示其实际应用,并通过具体代码示例帮助你掌握这种技术。
|
11天前
|
数据挖掘 调度 开发者
Python并发编程的艺术:掌握线程、进程与协程的同步技巧
并发编程在Python中涵盖线程、进程和协程,用于优化IO操作和响应速度。`threading`模块支持线程,`multiprocessing`处理进程,而`asyncio`则用于协程。线程通过Lock和Condition Objects同步,进程使用Queue和Pipe通信。协程利用异步事件循环避免上下文切换。了解并发模型及同步技术是提升Python应用性能的关键。
37 5
|
13天前
|
调度 开发者 UED
探索Python中的异步编程:从回调到协程
【6月更文挑战第14天】本文深入探讨了Python异步编程的演变历程,从最初的回调函数到现代的协程模型。我们将通过具体示例,展示如何利用asyncio库提升程序的执行效率和响应能力。文章旨在为读者提供一个清晰的异步编程发展脉络,并指导他们如何在项目中实际应用这些技术。
|
24天前
|
JavaScript 前端开发 程序员
Python协程与asyncio
理解Python中的协程,我们需从其底层原理开始,逐步深入。协程的核心在于控制流的非阻塞式管理,它允许在单一线程内实现并发处理,通过事件循环和协作式多任务来提高效率。
|
12天前
|
Java 开发者 计算机视觉
探索Python中的并发编程:线程与协程
本文将深入探讨Python中的并发编程,重点比较线程和协程的工作机制、优缺点及其适用场景,帮助开发者在实际项目中做出更明智的选择。
|
1月前
|
调度 Python
探索Python中的异步编程:从回调到协程
本文将介绍Python中的异步编程技术,从最初的回调函数到现代的协程模型。通过对比传统的同步编程方式和异步编程的优劣势,我们深入探讨了Python中异步编程的实现原理,以及如何利用asyncio库和async/await关键字来构建高效的异步应用程序。最后,我们还将讨论一些异步编程的最佳实践和常见问题的解决方法。
|
1月前
|
消息中间件 安全 调度
基于Python的性能优化(线程、协程、进程)
一、多线程 在CPU不密集、IO密集的任务下,多线程可以一定程度的提升运行效率。
|
Python
Python协程深入理解
从语法上来看,协程和生成器类似,都是定义体中包含yield关键字的函数。yield在协程中的用法: 在协程中yield通常出现在表达式的右边,例如:datum = yield,可以产出值,也可以不产出--如果yield关键字后面没有表达式,那么生成器产出None. 协程可能从调用方接受数据,调用方是通过send(datum)的方式把数据提供给协程使用,而不是next(...)函数,通常调用方会把值推送给协程。
1224 0