完整了解如何在python中处理协程和流

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
注册配置 MSE Nacos/ZooKeeper,118元/月
函数计算FC,每月免费额度15元,12个月
简介: 【6月更文挑战第25天】本文介绍异步库asyncio的概念和用法,异步编程在Python中是通过事件循环和协程实现并发,随着版本更新,API有所变化。

简介

本文介绍asyncio中常用概念和函数。

def`定义的函数是协程函数,调用产生协程对象。

  • asyncio库支持协程和任务,包括基于生成器的旧式协程。
  • 可等待对象:如协程、任务(asyncio.create_task())、Future,async/await用于控制流。
  • asyncio.run()运行协程,管理事件循环,仅用于主程序入口。
  • 任务:asyncio.create_task()创建和调度协程,asyncio.ensure_future()(<3.7)用于旧版。
  • asyncio.sleep()用于挂起任务,允许其他任务执行。
  • asyncio.gather()并发运行多个任务,返回结果列表或处理异常。
  • asyncio.shield()保护协程免受取消影响,3.10版本已移除。
  • asyncio.wait_for()超时控制,超时则引发TimeoutError
  • asyncio.wait()等待一组任务,返回完成和未完成的任务集。
  • as_completed()返回最早完成的任务,适合并发处理。
  • asyncio.to_thread()在新线程中运行函数,避免阻塞事件循环。
  • asyncio.run_coroutine_threadsafe()线程安全提交协程。
  • asyncio.current_task()all_tasks()用于检查当前或所有任务。
  • 取消任务:调用task.cancel(),捕获CancelledError
  • Stream函数如open_connection()start_server()用于网络流操作。
  • Unix套接字函数提供Unix域套接字的支持。

1 队列处理

在本文档中 "协程" 可用来表示两个紧密关联的概念:

协程函数: 定义形式为 async def 的函数;

协程对象: 调用 协程函数 所返回的对象。

asyncio 也支持旧式的 基于生成器的 协程。

  • 首先看一个:线程添加和取出

    from queue import Queue
    >>> a = Queue()
    

生成者进程

  >>> Thread(target=producer, args=(a, 10)).start()  

消费者进程

  >>> Thread(target=consumer, args=(a,)).start()   

当执行任务较多的时候,使用异步可以提前返回一些计算内容,其他执行时间较长的任务可以在其他进程继续执行。

2 asyncio 异步io库 概念

可等待对象

  • 协程 async/await
  • 任务 task
    asyncio.create_task() #封装协程为一个任务 该协程会被自动调度执行

  • Future

Future是一种特殊的低层级 可等待对象,表示一个异步操作的最终结果。

当一个Future对象被等待,意味着协程将保持直到该Future对象 在其他地方完成操作。

在asyncio 需要Future对象以便允许通过async/await 使用基于回调代码。

通常没有必要在应用层 创建 Future对象。

Future对象有时会 由库和某些asyncio API暴露给用户 用作可等对象。

3 协程与任务

https://docs.python.org/zh-cn/3/library/asyncio-task.html#coroutine
  • asyncio.run 运行 asyncio程序

执行 coroutine coro 并返回结果。

此函数 将运行传入的 协程,负责管理asyncio事件循环。
终结异步 生成器,并关闭线程池。

当有其他 asyncio 事件循环 在同一线程时,函数不能被调用
debug为True时,事件循环将以调式模式运行

此函数 总是创建一个新的事件循环并在结束时关闭。
它应该被当作asyncio程序主入口,理想情况只被调用一次。

  • task创建任务 asyncio.create_task(coro, *, name=None)

将coro协程封装为 Task并调度其执行,返回Task对象。

name不为None时,它将使用Task.set_name() 设为任务名称。

该任务在 get_running_loop() 返回的循环中执行。

如果当前线程没有在运行的循环则引发RuntimeError。

  • 3.7 以上的才支持,<3.7的版本需要 底层的 asyncio.ensure_future()函数。
   task = asyncio.create_task(coro())

等效

                     task = asyncio.ensure_future(coro())

休眠

        coroutine asyncio.sleep(delay, result=None)

阻塞 delay 指定秒数

如果指定了 result,则当协程完成时将其返回给调用者。
sleep() 总是会挂起当前任务,以允许其他任务运行。

将delay 设为0 将提供一个 经优化的路径,以运行其他任务运行。
这可供长期运行的函数使用,以避免在函数调用全过程中阻塞事件 循环。

并行运行任务

        awaitable asyncio.gather(*aws, return_exception=False)

并发运行 aws序列中的可等待对象。

如果aws的某个可等待对象为协程,它将自动被作为一个任务的调度。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合的列表,顺序与 aws可等待对象一致。

如果return_exceptions 为Fasle,所引发的异常将立即传播给等待gather()任务。

aws序列中其他可等待对象 不会被取消并继续运行。

如果aws 序列中的任一task 或 Future对象被取消,它将被引发CancelledError 一样处理。

在此情况下gather调用 不会被取消。

这是为了防止一个已提交的 Task/Future被取消导致其他Tasks/Future也被取消

如果 return_exception=False 则在gather()被标记已完成时,取消它不会取消 任何已提交的可等待对象。

如: 在一个异常传播给调用者之后,gather可标记为已完成,

因此, 在从gather捕获一个(由可等待对象所引发的)异常之后调用gather.cancel() 将不会 取消任何其他可等待对象。

  • 3.10 版本 已取消该参数 return_exception。

如果未提供位置参数或者并非所有位置参数均为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。

屏蔽取消操作

        asyncio.shield(aw)

保护一个可等待对象 防止其被取消,如果aw是一个协程,它将自动被作为任务调度。

            res = await shield(something())

相当于

            res = await something()

不同之处在于如果 保护它的协程被取消,在something() 运行中的任务不会被取消。从something()的角度看来。

取消操作并没有发生,然而其调用这已被取消,因此,await表达式仍然会引发 CancelledError。

如果 通过其他方式 取消something() 如内部操作,shield也将被取消。

如果希望 完全忽略 取消操作,不推荐,则shield() 函数需要一个 try/except代码段

            try:
                res = await shiedl(something())
            except CancelledError:
                res = None
  • 3.10 已删除,如果await 不是Future 类对象,并且没有正在运行的事件循环,将发生弃用警告。

超时

        coroutine asyncio.wait_for(aw, timeout)

等待 aw 可等待对象完成,指定 timeout 秒数后超时。

如果aw是一个协程,自动被作为任务调度。

timeout可以为None,也可以为 float 或 int 型整数表示的等待秒数。

如果为None,则必须到完成为止。
如果超时,任务将取消并引发 asyncio.TimeoutError。

要避免任务 取消,可以假设 shield()。

  • 简单等待

          coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
    

并发运行aws 可迭代对象中的可等待对象 并进入阻塞直到满足 return_when所指定条件。

aws 可迭代对象不可为空

返回 Task/Future集合 (done, pending)

此函数不会引发 asyncio.TimeoutError 当超时发生时,未完成的Future 或 Task 将在指定秒数后被返回。

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

wait() 会自动以任务的形式调度协程,之后将以 (done, pending) 集合形式返回显式创建的任务对象。

正确写法

   async def foo():
    return 42


    task = asyncio.create_task(foo())
    done, pending = await asyncio.wait({task})

if task in done:
    print("done")

完成处理

 as_completed(aws,*,timeout=None)

并发地运行aws 可迭代对象的可等待对象,返回一个协程迭代器。

返回的每个协程 可被等待以从剩余可等待对象的 可迭代对象中获取最早的结果。

如果所有Future对象完成前发生超时则引发asyncio.TimeoutError。

        for  coro in as_completed(aws):
             earliest_result = await coro

在线程运行

        coroutine asyncio.to_thread(func, /, *args, **kwargs)

不同的线程中异步运行函数 func。

向此函数提供的任何 args, * kwargs 会被直接传给func。

并且,当前 contextvars.Context将被传播,允许在不同线程访问来自事件循环的上下文变量。

返回一个可等待以获取 func 的最终结果的协程,主要用于执行在其他情况下会阻塞事件循环的IO密集型函数 方法。

  • 跨线程调度

          asyncio.run_coroutine_threadsafe(coro, loop)
    

向指定事件循环 提交一个线程。 线程安全。

返回一个 concurrent.futures.Future以等待来自其他OS线程的结果。

此函数应该从另一个OS线程调用,非事件循环运行所在线程。

  • 内省
    返回当前运行的Task实例,如果没有正在运行的任务则返回None。
    如果loop未None则使用 get_running_loop()获取当前事件循环。

    asyncio.current_task(loop=None)

返回事件循环所在运行的未完成Task对象集合。
如果loop 为Nong,则使用get_running_loop()获取当前事件。

           asyncio.all_tasks(loop=None)
  • Task对象
            asyncio.Task()
    

一个与Future类似 的对象,可运行python协程,非线程安全

  • 取消task

     async def cancel_me():
         print('cancel_me(): before sleep')
    
     try:
         # Wait for 1 hour
         await asyncio.sleep(5)
     except asyncio.CancelledError:
         print('cancel_me(): cancel sleep')
         raise
     finally:
         print('cancel_me(): after sleep')
    
     async def main9():
         # Create a "cancel_me" Task
         task = asyncio.create_task(cancel_me())
    
         # Wait for 1 second
         await asyncio.sleep(1)
    
         task.cancel()
         try:
             await task
         except asyncio.CancelledError:
             print("main(): cancel_me is cancelled now")
         finally:
             print(task.cancelled())
    
  • 基于生成器 的协程

    基于生成器的协程是 async/await语法前身,它们是使用 yuield from 创建的

    使用 @asyncio.coroutine装饰

4 流

  • Stream 函数

    下面的高级 asyncio 函数可以用来创建和处理流:

     coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)
    

建立网络连接并返回一对 (reader, writer) 对象。

返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例。

    coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

启动套接字服务。
当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。

该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

  • Unix 套接字

     coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
    

    建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。

与 open_connection() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_connection().

在 3.10 版更改: Removed the loop parameter.

    coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

启动一个 Unix 套接字服务。

StreamReader
    class asyncio.StreamReader

这个类表示一个读取器对象,该对象提供api以便于从IO流中读取数据。

不推荐直接实例化 StreamReader 对象,建议使用 open_connection() 和 start_server() 来获取 StreamReader 实例。

    coroutine read(n=- 1)

至多读取 n 个byte。 如果没有设置 n , 则自动置为 -1 , -1时表示读至 EOF 并返回所有读取的byte。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

    coroutine readline()

读取一行,其中“行”指的是以 \n 结尾的字节序列。

如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

StreamWriter
    class asyncio.StreamWriter

这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。

不建议直接实例化 StreamWriter;而应改用 open_connection() 和 start_server()。

    write(data)

此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

    stream.write(data)
    await stream.drain()
    writelines(data)

此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。

如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

    stream.writelines(lines)
    await stream.drain()
    close()

此方法会关闭流以及下层的套接字。

此方法应与 wait_closed() 方法一起使用:

    stream.close()
    await stream.wait_closed()

注册一个打开的套接字以等待使用流的数据。

使用低层级协议以及 loop.create_connection() 方法的 注册一个打开的套接字以等待使用协议的数据 示例。

    https://docs.python.org/zh-cn/3/library/asyncio-protocol.html#asyncio-example-create-connection

使用低层级的 loop.add_reader() 方法来监视文件描述符的 监视文件描述符以读取事件 示例。

使用 open_connection() 函数实现等待直到套接字接收到数据的协程:

    import asyncio
    import socket

    async def wait_for_data():
        # Get a reference to the current event loop because
        # we want to access low-level APIs.
        loop = asyncio.get_running_loop()

        # Create a pair of connected sockets.
        rsock, wsock = socket.socketpair()

        # Register the open socket to wait for data.
        reader, writer = await asyncio.open_connection(sock=rsock)

        # Simulate the reception of data from the network
        loop.call_soon(wsock.send, 'abc'.encode())

        # Wait for data
        data = await reader.read(100)

        # Got data, we are done: close the socket
        print("Received:", data.decode())
        writer.close()

        # Close the second socket
        wsock.close()

5 小结

本节介绍python的异步支持,异步包引入较晚,因此有部分功能仍在变动中。

比如在3.10支持PEGs 规范的解析器之后,取消了loop :Removed the loop parameter. 并且在await如果对象不可等待时,将返回错误。

目录
相关文章
|
27天前
|
Go Python
使用python实现一个用户态协程
【6月更文挑战第28天】本文探讨了如何在Python中实现类似Golang中协程(goroutines)和通道(channels)的概念。文章最后提到了`wait_for`函数在处理超时和取消操作中的作
23 1
使用python实现一个用户态协程
|
10天前
|
数据库 开发者 Python
实战指南:用Python协程与异步函数优化高性能Web应用
【7月更文挑战第15天】Python的协程与异步函数优化Web性能,通过非阻塞I/O提升并发处理能力。使用aiohttp库构建异步服务器,示例代码展示如何处理GET请求。异步处理减少资源消耗,提高响应速度和吞吐量,适用于高并发场景。掌握这项技术对提升Web应用性能至关重要。
36 10
|
10天前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
【7月更文挑战第15天】Python 3.5+引入的协程和异步函数革新了并发编程。协程,轻量级线程,由程序控制切换,降低开销。异步函数是协程的高级形式,允许等待异步操作。通过`asyncio`库,如示例所示,能并发执行任务,提高I/O密集型任务效率,实现并发而非并行,优化CPU利用率。理解和掌握这些工具对于构建高效网络应用至关重要。
22 6
|
10天前
|
大数据 数据处理 API
性能飞跃:Python协程与异步函数在数据处理中的高效应用
【7月更文挑战第15天】在大数据时代,Python的协程和异步函数解决了同步编程的性能瓶颈问题。同步编程在处理I/O密集型任务时效率低下,而Python的`asyncio`库支持的异步编程利用协程实现并发,通过`async def`和`await`避免了不必要的等待,提升了CPU利用率。例如,从多个API获取数据,异步方式使用`aiohttp`并发请求,显著提高了效率。掌握异步编程对于高效处理大规模数据至关重要。
20 4
|
10天前
|
设计模式 机器学习/深度学习 测试技术
设计模式转型:从传统同步到Python协程异步编程的实践与思考
【7月更文挑战第15天】探索从同步到Python协程异步编程的转变,异步处理I/O密集型任务提升效率。async/await关键词定义异步函数,asyncio库管理事件循环。面对挑战,如思维转变、错误处理和调试,可通过逐步迁移、学习资源、编写测试和使用辅助库来适应。通过实践和学习,开发者能有效优化性能和响应速度。
25 3
|
10天前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
16 2
|
10天前
|
Python
从零到一:构建Python异步编程思维,掌握协程与异步函数
【7月更文挑战第15天】Python异步编程提升效率,通过协程与异步函数实现并发。从async def定义异步函数,如`say_hello()`,使用`await`等待异步操作。`asyncio.run()`驱动事件循环。并发执行任务,如`asyncio.gather()`同时处理`fetch_data()`任务,降低总体耗时。入门异步编程,解锁高效代码。
17 1
|
12天前
|
数据库 Python
我们来看一个简单的Python协程示例,它使用了`async`和`await`关键字。
我们来看一个简单的Python协程示例,它使用了`async`和`await`关键字。
|
12天前
|
存储 调度 Python
异步编程概述在 Python中,`asyncio`库提供了对异步I/O、事件循环、协程(coroutine)和任务的支持。
异步编程概述在 Python中,`asyncio`库提供了对异步I/O、事件循环、协程(coroutine)和任务的支持。
|
2月前
|
安全 调度 Python
探索Python中的并发编程:协程与多线程的比较
本文将深入探讨Python中的并发编程技术,重点比较协程与多线程的特点和应用场景。通过对协程和多线程的原理解析,以及在实际项目中的应用案例分析,读者将能够更好地理解两种并发编程模型的异同,并在实践中选择合适的方案来提升Python程序的性能和效率。