Python 自带异步队列的大坑

简介: Python 自带异步队列的大坑

摄影:产品经理上海的烤茄子比成都差太远了

我们在使用 Python 的 asyncio 写异步程序的时候,可能会使用asyncio.Queue来实现一个异步队列,通过它来让生产者和消费者进行通信。

但如果你的异步队列没有填写maxsize参数,那么可能会产生让你意料之外的结果。我们来看一段代码:


import asyncioimport randomimport aiohttp
async def producer(queue):    for _ in range(10):        sleep_time = random.randint(1, 2)        await queue.put(sleep_time)
async def consumer(queue):    while True:        sleep_time = await queue.get()        size = queue.qsize()        print(f'当前队列有:{size} 个元素')        url = f'http://httpbin.org/delay/{sleep_time}'        async with aiohttp.ClientSession() as client:            resp = await client.get(url)            print(await resp.json())
async def main():    queue = asyncio.Queue()    asyncio.create_task(producer(queue))    con = asyncio.create_task(consumer(queue))    await con
asyncio.run(main())

这段代码把 producerconsumer分别创建成异步任务,期望实现的效果是生产者不停生产数据放进异步队列,消费者不停从队列读取数据,然后发起网络请求。生产者与消费者利用 IO 等待时间实现并行。

但如果你运行一下这段代码,你会发现一件很奇怪的事情,如下图所示:

当我们的消费者开始消费的时候,队列里面实际上已经有10条数据了!由于图中代码第19行是先读取了一条数据再打印剩余的数量,所以打印的是当前队列有:9 个元素

所以,生产者与消费者根本没有并行。是生产者里面的循环完全运行完成了,才开始运行的消费者!

如果在实际代码中,你的生产者生产了几百万条数据,那么此时所有数据全部都堆放在异步队列里面,很容易就把你的内存撑爆了!

那么这个问题要如何解决呢?实际上非常简单,使用maxsize参数指定异步队列的大小:


queue = asyncio.Queue(maxsize=3)

我们这里设定为3,再运行看看效果:

看到这里,可能有人会说,这仅仅是生产者先把异步队列堆满,才能进行消费,并没有什么本质区别啊,本质上还是先只有生产者运行,等他跑不动了(队列满了),消费者才能运行,还是没有实现并行啊。

这是由于,在上面的例子中,生产者的速度远远超过消费者的速度,所以才会出现生产者总是堆满队列的问题。

为了说明生产者和消费者能真正利用 IO 等待时间进行并行,我们改一下代码:


import asyncioimport randomimport aiohttp
async def producer(queue):    for i in range(10):        await queue.put(i)        await asyncio.sleep(random.randint(1, 3))
async def consumer(queue):    while True:        sleep_time = await queue.get()        size = queue.qsize()        print(f'当前队列有:{size} 个元素')        url = 'http://httpbin.org/delay/2'        async with aiohttp.ClientSession() as client:            resp = await client.get(url)            print(await resp.json())
async def main():    queue = asyncio.Queue(maxsize=3)    asyncio.create_task(producer(queue))    con = asyncio.create_task(consumer(queue))    await con
asyncio.run(main())

生产者生产数据后,随机休眠1-3秒。而消费者请求的网址总是2秒返回数据。这样一来,有时候生产者快,有时候消费者快。我们来看看运行效果:

可以看到,当生产者快的时候,异步队列里面的数据就会堆积,当消费者快的时候,异步队列里面的数据就会变少。说明生产者与消费者实现了利用 IO 等待时间进行并行操作。


目录
相关文章
|
14天前
|
数据采集 Java Python
python并发编程:Python异步IO实现并发爬虫
python并发编程:Python异步IO实现并发爬虫
24 1
|
14天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
14天前
|
存储 缓存 算法
Python中collections模块的deque双端队列:深入解析与应用
在Python的`collections`模块中,`deque`(双端队列)是一个线程安全、快速添加和删除元素的双端队列数据类型。它支持从队列的两端添加和弹出元素,提供了比列表更高的效率,特别是在处理大型数据集时。本文将详细解析`deque`的原理、使用方法以及它在各种场景中的应用。
|
14天前
|
前端开发 Python
Python中如何用栈实现队列
Python中如何用栈实现队列
215 0
|
14天前
|
调度 数据库 Python
【专栏】异步IO在处理IO密集型任务中的高效性
【4月更文挑战第27天】本文介绍了Python并发编程和异步IO,包括并发的基本概念(多线程、多进程、协程),线程与进程的实现(threading和multiprocessing模块),协程的使用(asyncio模块),以及异步IO的原理和优势。强调了异步IO在处理IO密集型任务中的高效性,指出应根据任务类型选择合适的并发技术。
|
14天前
|
安全
python_threading多线程、queue安全队列
python_threading多线程、queue安全队列
22 2
|
14天前
|
数据采集 数据挖掘 调度
异步爬虫实践攻略:利用Python Aiohttp框架实现高效数据抓取
本文介绍了如何使用Python的Aiohttp框架构建异步爬虫,以提升数据抓取效率。异步爬虫利用异步IO和协程技术,在等待响应时执行其他任务,提高效率。Aiohttp是一个高效的异步HTTP客户端/服务器框架,适合构建此类爬虫。文中还展示了如何通过代理访问HTTPS网页的示例代码,并以爬取微信公众号文章为例,说明了实际应用中的步骤。
|
5天前
|
数据采集 算法 Python
python3的几个大坑
python3的几个大坑
16 1
|
8天前
|
消息中间件 存储 NoSQL
一文读懂python分布式任务队列-celery
# 一文读懂Python分布式任务队列-Celery Celery是一个分布式任务执行框架,支持大量并发任务。它采用生产者-消费者模型,由Broker、Worker和Backend组成。生产者提交任务到队列,Worker异步执行,结果存储在Backend。适用于异步任务、大规模实时任务和定时任务。5月更文挑战第17天
25 1
|
14天前
|
API UED Python
使用Python进行异步HTTP请求的实践指南
使用Python进行异步HTTP请求的实践指南
24 4