Python 自带异步队列的大坑

简介: Python 自带异步队列的大坑

摄影:产品经理上海的烤茄子比成都差太远了

我们在使用 Python 的 asyncio 写异步程序的时候,可能会使用asyncio.Queue来实现一个异步队列,通过它来让生产者和消费者进行通信。

但如果你的异步队列没有填写maxsize参数,那么可能会产生让你意料之外的结果。我们来看一段代码:


import asyncioimport randomimport aiohttp
async def producer(queue):    for _ in range(10):        sleep_time = random.randint(1, 2)        await queue.put(sleep_time)
async def consumer(queue):    while True:        sleep_time = await queue.get()        size = queue.qsize()        print(f'当前队列有:{size} 个元素')        url = f'http://httpbin.org/delay/{sleep_time}'        async with aiohttp.ClientSession() as client:            resp = await client.get(url)            print(await resp.json())
async def main():    queue = asyncio.Queue()    asyncio.create_task(producer(queue))    con = asyncio.create_task(consumer(queue))    await con
asyncio.run(main())

这段代码把 producerconsumer分别创建成异步任务,期望实现的效果是生产者不停生产数据放进异步队列,消费者不停从队列读取数据,然后发起网络请求。生产者与消费者利用 IO 等待时间实现并行。

但如果你运行一下这段代码,你会发现一件很奇怪的事情,如下图所示:

当我们的消费者开始消费的时候,队列里面实际上已经有10条数据了!由于图中代码第19行是先读取了一条数据再打印剩余的数量,所以打印的是当前队列有:9 个元素

所以,生产者与消费者根本没有并行。是生产者里面的循环完全运行完成了,才开始运行的消费者!

如果在实际代码中,你的生产者生产了几百万条数据,那么此时所有数据全部都堆放在异步队列里面,很容易就把你的内存撑爆了!

那么这个问题要如何解决呢?实际上非常简单,使用maxsize参数指定异步队列的大小:


queue = asyncio.Queue(maxsize=3)

我们这里设定为3,再运行看看效果:

看到这里,可能有人会说,这仅仅是生产者先把异步队列堆满,才能进行消费,并没有什么本质区别啊,本质上还是先只有生产者运行,等他跑不动了(队列满了),消费者才能运行,还是没有实现并行啊。

这是由于,在上面的例子中,生产者的速度远远超过消费者的速度,所以才会出现生产者总是堆满队列的问题。

为了说明生产者和消费者能真正利用 IO 等待时间进行并行,我们改一下代码:


import asyncioimport randomimport aiohttp
async def producer(queue):    for i in range(10):        await queue.put(i)        await asyncio.sleep(random.randint(1, 3))
async def consumer(queue):    while True:        sleep_time = await queue.get()        size = queue.qsize()        print(f'当前队列有:{size} 个元素')        url = 'http://httpbin.org/delay/2'        async with aiohttp.ClientSession() as client:            resp = await client.get(url)            print(await resp.json())
async def main():    queue = asyncio.Queue(maxsize=3)    asyncio.create_task(producer(queue))    con = asyncio.create_task(consumer(queue))    await con
asyncio.run(main())

生产者生产数据后,随机休眠1-3秒。而消费者请求的网址总是2秒返回数据。这样一来,有时候生产者快,有时候消费者快。我们来看看运行效果:

可以看到,当生产者快的时候,异步队列里面的数据就会堆积,当消费者快的时候,异步队列里面的数据就会变少。说明生产者与消费者实现了利用 IO 等待时间进行并行操作。


目录
相关文章
|
3月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
在Python的并发编程世界中,没有万能的解决方案,只有最适合特定场景的方法。希望本文能够为你拨开迷雾,找到那条通往高效并发编程的光明大道。
51 2
|
4月前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
56 4
|
5天前
|
Python
深入理解 Python 中的异步操作:async 和 await
Python 的异步编程通过 `async` 和 `await` 关键字处理 I/O 密集型任务,如网络请求和文件读写,显著提高性能。`async` 定义异步函数,返回 awaitable 对象;`await` 用于等待这些对象完成。本文介绍异步编程基础、`async` 和 `await` 的用法、常见模式(并发任务、异常处理、异步上下文管理器)及实战案例(如使用 aiohttp 进行异步网络请求),帮助你高效利用系统资源并提升程序性能。
22 7
|
6天前
|
SQL 网络协议 安全
Python异步: 什么时候使用异步?
Asyncio 是 Python 中用于异步编程的库,适用于协程、非阻塞 I/O 和异步任务。使用 Asyncio 的原因包括:1) 使用协程实现轻量级并发;2) 采用异步编程范式提高效率;3) 实现非阻塞 I/O 提升 I/O 密集型应用性能。然而,Asyncio 并不适合所有场景,特别是在 CPU 密集型任务或已有线程/进程方案的情况下。选择 Asyncio 应基于项目需求和技术优势。
|
1月前
|
数据采集 JSON 测试技术
Grequests,非常 Nice 的 Python 异步 HTTP 请求神器
在Python开发中,处理HTTP请求至关重要。`grequests`库基于`requests`,支持异步请求,通过`gevent`实现并发,提高性能。本文介绍了`grequests`的安装、基本与高级功能,如GET/POST请求、并发控制等,并探讨其在实际项目中的应用。
48 3
|
3月前
|
关系型数据库 MySQL 数据处理
探索Python中的异步编程:从asyncio到异步数据库操作
在这个快节奏的技术世界里,效率和性能是关键。本文将带你深入Python的异步编程世界,从基础的asyncio库开始,逐步探索到异步数据库操作的高级应用。我们将一起揭开异步编程的神秘面纱,探索它如何帮助我们提升应用程序的性能和响应速度。
|
3月前
|
调度 Python
深入理解 Python 中的异步操作 | python小知识
在现代编程中,异步操作是一个非常重要的概念,尤其是在处理 I/O 密集型任务时。使用异步操作可以显著提高程序的性能和响应速度。Python 提供了 `async` 和 `await` 关键字,使得编写异步代码变得更加直观和简洁【10月更文挑战第8天】
43 2
|
3月前
|
数据采集 前端开发 NoSQL
Python编程异步爬虫实战案例
Python编程异步爬虫实战案例
85 2
|
3月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
深入探索:Python中的并发编程新纪元——协程与异步函数解析
33 3
|
2月前
|
NoSQL 关系型数据库 MySQL
python协程+异步总结!
本文介绍了Python中的协程、asyncio模块以及异步编程的相关知识。首先解释了协程的概念和实现方法,包括greenlet、yield关键字、asyncio装饰器和async/await关键字。接着详细讲解了协程的意义和应用场景,如提高IO密集型任务的性能。文章还介绍了事件循环、Task对象、Future对象等核心概念,并提供了多个实战案例,包括异步Redis、MySQL操作、FastAPI框架和异步爬虫。最后提到了uvloop作为asyncio的高性能替代方案。通过这些内容,读者可以全面了解和掌握Python中的异步编程技术。
53 0