摄影:产品经理上海的烤茄子比成都差太远了
我们在使用 Python 的 asyncio 写异步程序的时候,可能会使用asyncio.Queue
来实现一个异步队列,通过它来让生产者和消费者进行通信。
但如果你的异步队列没有填写maxsize
参数,那么可能会产生让你意料之外的结果。我们来看一段代码:
import asyncioimport randomimport aiohttp async def producer(queue): for _ in range(10): sleep_time = random.randint(1, 2) await queue.put(sleep_time) async def consumer(queue): while True: sleep_time = await queue.get() size = queue.qsize() print(f'当前队列有:{size} 个元素') url = f'http://httpbin.org/delay/{sleep_time}' async with aiohttp.ClientSession() as client: resp = await client.get(url) print(await resp.json()) async def main(): queue = asyncio.Queue() asyncio.create_task(producer(queue)) con = asyncio.create_task(consumer(queue)) await con asyncio.run(main())
这段代码把 producer
和consumer
分别创建成异步任务,期望实现的效果是生产者不停生产数据放进异步队列,消费者不停从队列读取数据,然后发起网络请求。生产者与消费者利用 IO 等待时间实现并行。
但如果你运行一下这段代码,你会发现一件很奇怪的事情,如下图所示:
当我们的消费者开始消费的时候,队列里面实际上已经有10条数据了!由于图中代码第19行是先读取了一条数据再打印剩余的数量,所以打印的是当前队列有:9 个元素
。
所以,生产者与消费者根本没有并行。是生产者里面的循环完全运行完成了,才开始运行的消费者!
如果在实际代码中,你的生产者生产了几百万条数据,那么此时所有数据全部都堆放在异步队列里面,很容易就把你的内存撑爆了!
那么这个问题要如何解决呢?实际上非常简单,使用maxsize
参数指定异步队列的大小:
queue = asyncio.Queue(maxsize=3)
我们这里设定为3,再运行看看效果:
看到这里,可能有人会说,这仅仅是生产者先把异步队列堆满,才能进行消费,并没有什么本质区别啊,本质上还是先只有生产者运行,等他跑不动了(队列满了),消费者才能运行,还是没有实现并行啊。
这是由于,在上面的例子中,生产者的速度远远超过消费者的速度,所以才会出现生产者总是堆满队列的问题。
为了说明生产者和消费者能真正利用 IO 等待时间进行并行,我们改一下代码:
import asyncioimport randomimport aiohttp async def producer(queue): for i in range(10): await queue.put(i) await asyncio.sleep(random.randint(1, 3)) async def consumer(queue): while True: sleep_time = await queue.get() size = queue.qsize() print(f'当前队列有:{size} 个元素') url = 'http://httpbin.org/delay/2' async with aiohttp.ClientSession() as client: resp = await client.get(url) print(await resp.json()) async def main(): queue = asyncio.Queue(maxsize=3) asyncio.create_task(producer(queue)) con = asyncio.create_task(consumer(queue)) await con asyncio.run(main())
生产者生产数据后,随机休眠1-3秒。而消费者请求的网址总是2秒返回数据。这样一来,有时候生产者快,有时候消费者快。我们来看看运行效果:
可以看到,当生产者快的时候,异步队列里面的数据就会堆积,当消费者快的时候,异步队列里面的数据就会变少。说明生产者与消费者实现了利用 IO 等待时间进行并行操作。