一日一技:使用 asyncio 如何限制协程的并发数

简介: 一日一技:使用 asyncio 如何限制协程的并发数

如果使用 asyncio + httpx 实现并发请求,怎么限制请求的频率呢?怎么限制最多只能有 x 个请求同时发出呢?我们今天给出两种方案。


提出问题


假设如果我们同时发起12个请求,每个请求的时间不同,那么总共的请求时间大概跟最长耗时的请求差不多。我们先来写一个用于测试的例子:


import asyncio
import httpx
import time
async def req(delay):
    print(f'请求一个延迟为{delay}秒的接口')
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
        result = resp.json()
        print(result)
async def main():
    start = time.time()
    delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
    task_list = []
    for delay in delay_list:
        task = asyncio.create_task(req(delay))
        task_list.append(task)
    await asyncio.gather(*task_list)
    end = time.time()
    print(f'一共耗时:{end - start}')
asyncio.run(main())


这段代码,使用 for 循环创建了12个协程任务,这些任务几乎同时运行,于是,请求完成所有的接口,总共耗时如下图所示:


1.png


那么我们怎么确保同一时间最多只有3个协程在请求网络呢?


限制协程任务数


第一个方案跟以前限制多线程的线程数的方案相同。我们创建一个列表,确保列表里面最多只有3个任务,然后持续循环检查,发现有任务完成了,就移除这个完成的任务,并加入一个新的任务,直到待爬的列表为空,这个任务列表也为空。代码如下:


import asyncio
import httpx
import time
async def req(delay):
    print(f'请求一个延迟为{delay}秒的接口')
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
        result = resp.json()
        print(result)
async def main():
    start = time.time()
    delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
    task_list = []
    while True:
        if not delay_list and not task_list:
            break
        while len(task_list) < 3:
            if delay_list:
                delay = delay_list.pop()
                task = asyncio.create_task(req(delay))
                task_list.append(task)
            else:
                break
        task_list = [task for task in task_list if not task.done()]
        await asyncio.sleep(1)
    end = time.time()
    print(f'一共耗时:{end - start}')
asyncio.run(main())


运行效果如下图所示:


2.png


总共耗时大概28秒左右。比串行需要的58秒快了一半,但比全部同时并发多了一倍。


使用 Semaphore


asyncio 实际上自带了一个限制协程数量的类,叫做Semaphore。我们只需要初始化它,传入最大允许的协程数量,然后就可以通过上下文管理器来使用。我们看一下代码:


import asyncio
import httpx
import time
async def req(delay, sem):
    print(f'请求一个延迟为{delay}秒的接口')
    async with sem:
        async with httpx.AsyncClient(timeout=20) as client:
            resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
            result = resp.json()
            print(result)
async def main():
    start = time.time()
    delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
    task_list = []
    sem = asyncio.Semaphore(3)
    for delay in delay_list:
        task = asyncio.create_task(req(delay, sem))
        task_list.append(task)
    await asyncio.gather(*task_list)
    end = time.time()
    print(f'一共耗时:{end - start}')
asyncio.run(main())


运行效果如下图所示:


3.png


耗时为22秒,比第一个方案更快。


我们来看看Semaphore的用法,它的格式为:


sem = asyncio.Semaphore(同时运行的协程数量)
async def func(sem):
    async with sem:
        这里是并发执行的代码
task_list = []
for _ in range(总共需要执行的任务数):
    task = asyncio.create_task(func(sem))
    task_list.append(task)
await asyncio.gather(*task_list)


当我们要限制一个协程的并发数的时候,可以在调用协程之前,先初始化一个Semaphore对象。然后把这个对象传到需要限制并发的协程里面,在协程里面,使用异步上下文管理器包住你的正式代码:


async with sem:
    正式代码


这样一来,如果并发数没有达到限制,那么async with sem会瞬间执行完成,进入里面的正式代码中。如果并发数已经达到了限制,那么其他的协程会阻塞在async with sem这个地方,直到正在运行的某个协程完成了,退出了,才会放行一个新的协程去替换掉这个已经完成的协程。


这个写法其实跟多线程的加锁很像。只不过锁是确保同一个时间只有一个线程在运行,而Semaphore可以人为指定能有多少个协程同时运行。


如何限制1分钟内能够运行的协程数


可能同学看了上面的例子以后,只知道如何限制同时运行的协程数。但是怎么限制在一段时间里同时运行的协程数呢?


其实非常简单,在并发的协程里面加个 asyncio.sleep 就可以了。例如上面的例子,我想限制每分钟只能有3个协程,那么可以把代码改为:


async def req(delay, sem):
    print(f'请求一个延迟为{delay}秒的接口')
    async with sem:
        async with httpx.AsyncClient(timeout=20) as client:
            resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
            result = resp.json()
            print(result)
    await asyncio.sleep(60)


总结


如果大家要限制协程的并发数,那么最简单的办法就是使用asyncio.Semaphore。但需要注意的是,只能在启动协程之前初始化它,然后传给协程。要确保所有并发协程拿到的是同一个Semaphore对象。


当然,你的程序里面,可能有多个不同的部分,有些部分限制并发数为 a,有些部分限制并发数为 b。那么你可以初始化多个Semaphore对象,分别传给不同的协程。


请关注微信公众号【未闻Code】获取更多精彩文章。

目录
相关文章
|
1月前
|
数据采集 关系型数据库 MySQL
python-协程(async、await关键字与asyncio)
python-协程(async、await关键字与asyncio)
118 0
|
4天前
|
监控 程序员 调度
协程实现单线程并发(入门)
协程实现单线程并发(入门)
10 1
|
19天前
|
JavaScript 前端开发 程序员
Python协程与asyncio
理解Python中的协程,我们需从其底层原理开始,逐步深入。协程的核心在于控制流的非阻塞式管理,它允许在单一线程内实现并发处理,通过事件循环和协作式多任务来提高效率。
|
1月前
|
存储 Python
python使用gevent库来创建协程,并通过协程实现并发执行不同的任务
```markdown 这段Python代码利用`gevent`库实现并发执行协程。定义了两个打印函数`f1`和`f2`,分别输出&quot;csdn&quot;和&quot;yyds&quot;。代码首先创建列表`t_l`,并启动5个`f1`协程,将其加入列表并等待所有协程完成。随后,同样方式启动5个`f2`协程,存入`t1_l`列表并等待执行完毕。整体展示了`gevent`的协程并发操作。 ```
16 1
|
1月前
|
程序员 Go 数据处理
|
1月前
|
调度 Python
python协程—asyncio模块
python协程—asyncio模块
34 0
|
1月前
|
开发者 Python
深入浅出Python协程:提高并发性能的利器
本文旨在深入探讨Python中的协程机制,一种轻量级的并发编程解决方案。与传统的多线程和多进程相比,协程提供了更高效的并发性能,尤其是在I/O密集型应用中。我们将从协程的基本概念入手,解析其工作原理,并通过实例讲解如何在Python中使用协程来优化程序性能。文章还将对比协程与其他并发模型的优缺点,帮助读者全面理解协程在现代软件开发中的应用价值。
34 3
|
1月前
|
API 调度 开发者
深入浅出Python协程:提高并发性能的秘诀
在当今快速发展的互联网时代,软件系统面临着越来越多的并发处理需求。本文将深入探讨Python中的协程(Coroutine)概念,它作为一种轻量级线程,通过优雅地在单个线程内部进行任务切换,实现高效的IO操作。本文不仅将介绍协程的基础知识和工作原理,还会通过实例演示如何在Python项目中应用协程来提高并发性能,最后将对比协程与传统多线程、多进程模型的优缺点,帮助读者更好地理解协程在现代编程中的重要性。
60 3
|
1月前
|
开发者 Python
深入理解Python协程:提高并发性能的关键
在本文中,我们将探索Python中的协程(coroutine)机制,这是一种比线程更轻量级的并发执行单元。通过深入分析协程的工作原理、如何使用以及它们如何帮助提高应用程序的并发性能,本文旨在为读者提供一个全面的理解。不同于传统的技术文章摘要,我们不仅简述内容,还承诺将带领读者通过实际代码示例,深入浅出地掌握协程的强大之处。无论你是初学者还是有经验的开发者,本文都将为你打开并行编程的新视界。
|
1月前
|
Python
深入浅出Python协程:提升并发性能的艺术
本文旨在深入探讨Python协程的内部机制及其在提升程序并发性能中的关键作用。不同于传统的摘要,我们将通过一个简洁明了的比喻来揭示协程的本质:想象一个高效的快餐厨房,厨师(主线程)在准备一个订单(任务)时,如果需要等待某个步骤(如烤面包),他会转而开始准备下一个订单,而不是站在那里等待。这样,厨房的整体效率得到了极大提升。Python协程正是这样一种机制,它允许代码在等待操作(如I/O操作)完成时“挂起”,转而执行其他任务,从而显著提高并发性能。本文将通过示例和解析,带你一步步深入理解协程的工作原理,以及如何在你的Python项目中有效地利用协程来提升性能。
44 1