Python 自带异步队列的大坑

简介: Python 自带异步队列的大坑

摄影:产品经理上海的烤茄子比成都差太远了

我们在使用 Python 的 asyncio 写异步程序的时候,可能会使用asyncio.Queue来实现一个异步队列,通过它来让生产者和消费者进行通信。

但如果你的异步队列没有填写maxsize参数,那么可能会产生让你意料之外的结果。我们来看一段代码:


import asyncioimport randomimport aiohttp
async def producer(queue):    for _ in range(10):        sleep_time = random.randint(1, 2)        await queue.put(sleep_time)
async def consumer(queue):    while True:        sleep_time = await queue.get()        size = queue.qsize()        print(f'当前队列有:{size} 个元素')        url = f'http://httpbin.org/delay/{sleep_time}'        async with aiohttp.ClientSession() as client:            resp = await client.get(url)            print(await resp.json())
async def main():    queue = asyncio.Queue()    asyncio.create_task(producer(queue))    con = asyncio.create_task(consumer(queue))    await con
asyncio.run(main())

这段代码把 producerconsumer分别创建成异步任务,期望实现的效果是生产者不停生产数据放进异步队列,消费者不停从队列读取数据,然后发起网络请求。生产者与消费者利用 IO 等待时间实现并行。

但如果你运行一下这段代码,你会发现一件很奇怪的事情,如下图所示:

当我们的消费者开始消费的时候,队列里面实际上已经有10条数据了!由于图中代码第19行是先读取了一条数据再打印剩余的数量,所以打印的是当前队列有:9 个元素

所以,生产者与消费者根本没有并行。是生产者里面的循环完全运行完成了,才开始运行的消费者!

如果在实际代码中,你的生产者生产了几百万条数据,那么此时所有数据全部都堆放在异步队列里面,很容易就把你的内存撑爆了!

那么这个问题要如何解决呢?实际上非常简单,使用maxsize参数指定异步队列的大小:


queue = asyncio.Queue(maxsize=3)

我们这里设定为3,再运行看看效果:

看到这里,可能有人会说,这仅仅是生产者先把异步队列堆满,才能进行消费,并没有什么本质区别啊,本质上还是先只有生产者运行,等他跑不动了(队列满了),消费者才能运行,还是没有实现并行啊。

这是由于,在上面的例子中,生产者的速度远远超过消费者的速度,所以才会出现生产者总是堆满队列的问题。

为了说明生产者和消费者能真正利用 IO 等待时间进行并行,我们改一下代码:


import asyncioimport randomimport aiohttp
async def producer(queue):    for i in range(10):        await queue.put(i)        await asyncio.sleep(random.randint(1, 3))
async def consumer(queue):    while True:        sleep_time = await queue.get()        size = queue.qsize()        print(f'当前队列有:{size} 个元素')        url = 'http://httpbin.org/delay/2'        async with aiohttp.ClientSession() as client:            resp = await client.get(url)            print(await resp.json())
async def main():    queue = asyncio.Queue(maxsize=3)    asyncio.create_task(producer(queue))    con = asyncio.create_task(consumer(queue))    await con
asyncio.run(main())

生产者生产数据后,随机休眠1-3秒。而消费者请求的网址总是2秒返回数据。这样一来,有时候生产者快,有时候消费者快。我们来看看运行效果:

可以看到,当生产者快的时候,异步队列里面的数据就会堆积,当消费者快的时候,异步队列里面的数据就会变少。说明生产者与消费者实现了利用 IO 等待时间进行并行操作。


目录
相关文章
|
2月前
|
数据采集 存储 JSON
Python爬取知乎评论:多线程与异步爬虫的性能优化
Python爬取知乎评论:多线程与异步爬虫的性能优化
|
2月前
|
数据采集 存储 C++
Python异步爬虫(aiohttp)加速微信公众号图片下载
Python异步爬虫(aiohttp)加速微信公众号图片下载
|
2月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
11月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
在Python的并发编程世界中,没有万能的解决方案,只有最适合特定场景的方法。希望本文能够为你拨开迷雾,找到那条通往高效并发编程的光明大道。
153 2
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
143 4
|
8月前
|
人工智能 开发者 Python
Chainlit:一个开源的异步Python框架,快速构建生产级对话式 AI 应用
Chainlit 是一个开源的异步 Python 框架,帮助开发者在几分钟内构建可扩展的对话式 AI 或代理应用,支持多种工具和服务集成。
761 9
|
8月前
|
Python
深入理解 Python 中的异步操作:async 和 await
Python 的异步编程通过 `async` 和 `await` 关键字处理 I/O 密集型任务,如网络请求和文件读写,显著提高性能。`async` 定义异步函数,返回 awaitable 对象;`await` 用于等待这些对象完成。本文介绍异步编程基础、`async` 和 `await` 的用法、常见模式(并发任务、异常处理、异步上下文管理器)及实战案例(如使用 aiohttp 进行异步网络请求),帮助你高效利用系统资源并提升程序性能。
789 7
|
8月前
|
SQL 网络协议 安全
Python异步: 什么时候使用异步?
Asyncio 是 Python 中用于异步编程的库,适用于协程、非阻塞 I/O 和异步任务。使用 Asyncio 的原因包括:1) 使用协程实现轻量级并发;2) 采用异步编程范式提高效率;3) 实现非阻塞 I/O 提升 I/O 密集型应用性能。然而,Asyncio 并不适合所有场景,特别是在 CPU 密集型任务或已有线程/进程方案的情况下。选择 Asyncio 应基于项目需求和技术优势。
130 2
|
9月前
|
数据采集 JSON 测试技术
Grequests,非常 Nice 的 Python 异步 HTTP 请求神器
在Python开发中,处理HTTP请求至关重要。`grequests`库基于`requests`,支持异步请求,通过`gevent`实现并发,提高性能。本文介绍了`grequests`的安装、基本与高级功能,如GET/POST请求、并发控制等,并探讨其在实际项目中的应用。
211 3
|
11月前
|
关系型数据库 MySQL 数据处理
探索Python中的异步编程:从asyncio到异步数据库操作
在这个快节奏的技术世界里,效率和性能是关键。本文将带你深入Python的异步编程世界,从基础的asyncio库开始,逐步探索到异步数据库操作的高级应用。我们将一起揭开异步编程的神秘面纱,探索它如何帮助我们提升应用程序的性能和响应速度。

推荐镜像

更多