运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践

简介: 我们一直都相信这样一种说法:协程是比多线程更高效的一种并发工作方式,它完全由程序本身所控制,也就是在用户态执行,协程避免了像线程切换那样产生的上下文切换,在性能方面得到了很大的提升。毫无疑问,这是颠扑不破的业界共识,是放之四海而皆准的真理。

我们一直都相信这样一种说法:协程是比多线程更高效的一种并发工作方式,它完全由程序本身所控制,也就是在用户态执行,协程避免了像线程切换那样产生的上下文切换,在性能方面得到了很大的提升。毫无疑问,这是颠扑不破的业界共识,是放之四海而皆准的真理。

但事实上,协程远比大多数人想象中的复杂,正因为协程的“用户态”特性,任务调度权掌握在撰写协程任务的人手里,而仅仅依赖async和await关键字远远达不到“调度”的级别,有时候反而会拖累任务效率,使其在任务执行效率上还不及“系统态”的多线程和多进程,本次我们来探讨一下Python3原生协程任务的调度管理。

Python3.10协程库async.io的基本操作

事件循环(Eventloop)是 原生协程库asyncio 的核心,可以理解为总指挥。Eventloop实例提供了注册、取消和执行任务和回调的方法。

Eventloop可以将一些异步方法绑定到事件循环上,事件循环会循环执行这些方法,但是和多线程一样,同时只能执行一个方法,因为协程也是单线程执行。当执行到某个方法时,如果它遇到了阻塞,事件循环会暂停它的执行去执行其他的方法,与此同时为这个方法注册一个回调事件,当某个方法从阻塞中恢复,下次轮询到它的时候将会继续执行,亦或者,当没有轮询到它,它提前从阻塞中恢复,也可以通过回调事件进行切换,如此往复,这就是事件循环的简单逻辑。

而上面最核心的动作就是切换别的方法,怎么切换?用await关键字:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
  
async def job2():  
    print('job2开始')  
  
  
async def main():  
    await job1()  
    await job2()  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统返回:

job1开始  
job1结束  
job2开始

是的,切则切了,可切的对吗?事实上这两个协程任务并没有达成“协作”,因为它们是同步执行的,所以并不是在方法内await了,就可以达成协程的工作方式,我们需要并发启动这两个协程任务:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
  
async def job2():  
    print('job2开始')  
  
  
async def main():  
    #await job1()  
    #await job2()  
    await asyncio.gather(job1(), job2())  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统返回:

job1开始  
job2开始  
job1结束

如果没有asyncio.gather的参与,协程方法就是普通的同步方法,就算用async声明了异步也无济于事。而asyncio.gather的基础功能就是将协程任务并发执行,从而达成“协作”。

但事实上,Python3.10也支持“同步写法”的协程方法:

async def create_task():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    await task1  
    await task2

这里我们通过asyncio.create\_task对job1和job2进行封装,返回的对象再通过await进行调用,由此两个单独的异步方法就都被绑定到同一个Eventloop了,这样虽然写法上同步,但其实是异步执行:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
  
async def job2():  
    print('job2开始')  
  
  
async def create_task():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    await task1  
    await task2  
  
  
async def main():  
    #await job1()  
    #await job2()  
    await asyncio.gather(job1(), job2())  
  
  
if __name__ == '__main__':  
    asyncio.run(create_task())

系统返回:

job1开始  
job2开始  
job1结束

协程任务的上下游监控

解决了并发执行的问题,现在假设每个异步任务都会返回一个操作结果:

async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"

通过asyncio.gather方法,我们可以收集到任务执行结果:

async def main():  
  
    res = await asyncio.gather(job1(), job2())  
    print(res)

并发执行任务:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"  
  
  
  
async def main():  
  
    res = await asyncio.gather(job1(), job2())  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统返回:

job1开始  
job2开始  
job1结束  
['job1', 'job2']

但任务结果仅仅也就是方法的返回值,除此之外,并没有其他有价值的信息,对协程任务的执行明细讳莫如深。

现在我们换成asyncio.wait方法:

async def main():  
  
    res = await asyncio.wait([job1(), job2()])  
    print(res)

依然并发执行:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"  
  
  
  
async def main():  
  
    res = await asyncio.wait([job1(), job2()])  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统返回:

job1开始  
job2开始  
job1结束  
({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任务结果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任务结果'>}, set())

可以看出,asyncio.wait返回的是任务对象,里面存储了大部分的任务信息,包括执行状态。

在默认情况下,asyncio.wait会等待全部任务完成 (return\_when='ALL\_COMPLETED'),它还支持 return\_when='FIRST\_COMPLETED'(第一个协程完成就返回)和 return\_when='FIRST\_EXCEPTION'(出现第一个异常就返回)。

这就非常令人兴奋了,因为如果异步消费任务是发短信之类的需要统计达到率的任务,利用asyncio.wait特性,我们就可以第一时间记录任务完成或者异常的具体时间。

协程任务守护

假设由于某种原因,我们手动终止任务消费:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"  
  
  
  
async def main():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    task1.cancel()  
    res = await asyncio.gather(task1, task2)  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统报错:

File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main  
    res = await asyncio.gather(task1, task2)  
asyncio.exceptions.CancelledError  
  

这里job1被手动取消,但会影响job2的执行,这违背了协程“互相提携”的特性。

事实上,asyncio.gather方法可以捕获协程任务的异常:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"  
  
  
  
async def main():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    task1.cancel()  
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统返回:

job2开始  
[CancelledError(''), 'job2任务结果']

可以看到job1没有被执行,并且异常替代了任务结果作为返回值。

但如果协程任务启动之后,需要保证任务情况下都不会被取消,此时可以使用asyncio.shield方法守护协程任务:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"  
  
  
  
async def main():  
    task1 = asyncio.shield(job1())  
    task2 = asyncio.create_task(job2())  
      
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
  
    task1.cancel()  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统返回:

job1开始  
job2开始  
job1结束  
['job1任务结果', 'job2任务结果']

协程任务回调

假设协程任务执行完毕之后,需要立刻进行回调操作,比如将任务结果推送到其他接口服务上:

import asyncio  
  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"  
  
  
def callback(future):  
    print(f'回调任务: {future.result()}')  
  
  
  
async def main():  
    task1 = asyncio.shield(job1())  
    task2 = asyncio.create_task(job2())  
  
    task1.add_done_callback(callback)  
      
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

这里我们通过add\_done\_callback方法对job1指定了callback方法,当任务执行完以后,callback会被调用,系统返回:

job1开始  
job2开始  
job1结束  
回调任务: job1任务结果  
['job1任务结果', 'job2任务结果']

与此同时,add\_done\_callback方法不仅可以获取协程任务返回值,它自己也支持参数参数传递:

import asyncio  
from functools import partial  
  
async def job1():  
    print('job1开始')  
    await asyncio.sleep(1)  
    print('job1结束')  
  
    return "job1任务结果"  
  
  
async def job2():  
    print('job2开始')  
  
    return "job2任务结果"  
  
  
def callback(future,num):  
    print(f"回调参数{num}")  
    print(f'回调任务: {future.result()}')  
  
  
  
async def main():  
    task1 = asyncio.shield(job1())  
    task2 = asyncio.create_task(job2())  
  
    task1.add_done_callback(partial(callback,num=1))  
      
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系统返回:

job1开始  
job2开始  
job1结束  
回调参数1  
回调任务: job1任务结果  
['job1任务结果', 'job2任务结果']

结语

成也用户态,败也用户态。所谓水能载舟亦能覆舟,协程消费任务的调度远比多线程的系统级调度要复杂,稍不留神就会造成业务上的“同步”阻塞,弄巧成拙,适得其反。这也解释了为什么相似场景中多线程的出场率要远远高于协程,就是因为多线程不需要考虑启动后的“切换”问题,无为而为,简单粗暴。

相关文章
|
11天前
|
Python
深入理解 Python 中的异步操作:async 和 await
Python 的异步编程通过 `async` 和 `await` 关键字处理 I/O 密集型任务,如网络请求和文件读写,显著提高性能。`async` 定义异步函数,返回 awaitable 对象;`await` 用于等待这些对象完成。本文介绍异步编程基础、`async` 和 `await` 的用法、常见模式(并发任务、异常处理、异步上下文管理器)及实战案例(如使用 aiohttp 进行异步网络请求),帮助你高效利用系统资源并提升程序性能。
25 7
|
11天前
|
SQL 网络协议 安全
Python异步: 什么时候使用异步?
Asyncio 是 Python 中用于异步编程的库,适用于协程、非阻塞 I/O 和异步任务。使用 Asyncio 的原因包括:1) 使用协程实现轻量级并发;2) 采用异步编程范式提高效率;3) 实现非阻塞 I/O 提升 I/O 密集型应用性能。然而,Asyncio 并不适合所有场景,特别是在 CPU 密集型任务或已有线程/进程方案的情况下。选择 Asyncio 应基于项目需求和技术优势。
|
1月前
|
数据采集 JSON 测试技术
Grequests,非常 Nice 的 Python 异步 HTTP 请求神器
在Python开发中,处理HTTP请求至关重要。`grequests`库基于`requests`,支持异步请求,通过`gevent`实现并发,提高性能。本文介绍了`grequests`的安装、基本与高级功能,如GET/POST请求、并发控制等,并探讨其在实际项目中的应用。
54 3
|
3月前
|
关系型数据库 MySQL 数据处理
探索Python中的异步编程:从asyncio到异步数据库操作
在这个快节奏的技术世界里,效率和性能是关键。本文将带你深入Python的异步编程世界,从基础的asyncio库开始,逐步探索到异步数据库操作的高级应用。我们将一起揭开异步编程的神秘面纱,探索它如何帮助我们提升应用程序的性能和响应速度。
|
3月前
|
调度 Python
python知识点100篇系列(20)-python协程与异步编程asyncio
【10月更文挑战第8天】协程(Coroutine)是一种用户态内的上下文切换技术,通过单线程实现代码块间的切换执行。Python中实现协程的方法包括yield、asyncio模块及async/await关键字。其中,async/await结合asyncio模块可更便捷地编写和管理协程,支持异步IO操作,提高程序并发性能。协程函数、协程对象、Task对象等是其核心概念。
|
2月前
|
NoSQL 关系型数据库 MySQL
python协程+异步总结!
本文介绍了Python中的协程、asyncio模块以及异步编程的相关知识。首先解释了协程的概念和实现方法,包括greenlet、yield关键字、asyncio装饰器和async/await关键字。接着详细讲解了协程的意义和应用场景,如提高IO密集型任务的性能。文章还介绍了事件循环、Task对象、Future对象等核心概念,并提供了多个实战案例,包括异步Redis、MySQL操作、FastAPI框架和异步爬虫。最后提到了uvloop作为asyncio的高性能替代方案。通过这些内容,读者可以全面了解和掌握Python中的异步编程技术。
54 0
|
2月前
|
数据采集 缓存 程序员
python协程使用教程
1. **协程**:介绍了协程的概念、与子程序的区别、优缺点,以及如何在 Python 中使用协程。 2. **同步与异步**:解释了同步与异步的概念,通过示例代码展示了同步和异步处理的区别和应用场景。 3. **asyncio 模块**:详细介绍了 asyncio 模块的概述、基本使用、多任务处理、Task 概念及用法、协程嵌套与返回值等。 4. **aiohttp 与 aiofiles**:讲解了 aiohttp 模块的安装与使用,包括客户端和服务器端的简单实例、URL 参数传递、响应内容读取、自定义请求等。同时介绍了 aiofiles 模块的安装与使用,包括文件读写和异步迭代
48 0
|
7月前
|
Go Python
使用python实现一个用户态协程
【6月更文挑战第28天】本文探讨了如何在Python中实现类似Golang中协程(goroutines)和通道(channels)的概念。文章最后提到了`wait_for`函数在处理超时和取消操作中的作
66 1
使用python实现一个用户态协程
|
4月前
|
调度 Python
python3 协程实战(python3经典编程案例)
该文章通过多个实战案例介绍了如何在Python3中使用协程来提高I/O密集型应用的性能,利用asyncio库以及async/await语法来编写高效的异步代码。
41 0
|
6月前
|
数据库 开发者 Python
实战指南:用Python协程与异步函数优化高性能Web应用
【7月更文挑战第15天】Python的协程与异步函数优化Web性能,通过非阻塞I/O提升并发处理能力。使用aiohttp库构建异步服务器,示例代码展示如何处理GET请求。异步处理减少资源消耗,提高响应速度和吞吐量,适用于高并发场景。掌握这项技术对提升Web应用性能至关重要。
96 10