1. 前言
看到这个题目大家觉得很慌,我都协程并发处理数据了,为什么还需要线程?孰轻孰重我们当然想的清楚,但是这里的门道就是一句话:我不管你异步编程多么NB,我就是想一边用异步,一边调用同步阻塞的代码,我不管我就这样用,你就说能不能办到?这可把我难倒了,上节课不是说了异步编程核心原则:要异步,所有的都要异步,感觉与其背道而驰啊。经过一番研究发现:草率了,异步中可以调用同步的代码,但是吃相比较难看而已罢了。
2. 异步+同步
import time import asyncio from concurrent.futures.thread import ThreadPoolExecutor # 同步阻塞代码 def run(): time.sleep(2) if __name__ == '__main__': loop = asyncio.get_event_loop() start = time.time() executor = ThreadPoolExecutor() # 初始化线程池 tasks = [] for i in range(20): task = loop.run_in_executor(executor, run) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) # 异步调用 print("total time: ", time.time()-start)
执行时间:
total time: 4.015759229660034
20个同步阻塞任务在经过异步+同步调用之后时间增加了,但这是相比较于协程得出的结论(我们下面会分析)。我们可以看到程序通过ThreadPoolExecutor
创建线程池,最后通过loop的run_in_executor
把线程的future包装成协程的future,最后被loop的run_until_complete
调度。
线程转换为协程 核心
为什么线程future可以转换成协程呢?
核心就是
loop.run_in_executor
方法,源码大家下去可以自己看,我只提取重点:return futures.wrap_future(executor.submit(func, *args), loop=self)
,看到了吗,线程执行的结果即executor.submit的返回值设置到concurrent.futures.Future
对象里面(记为future1),而协程通过loop.create_future
创建自己的future(记为future2),最后通过_chain_future
将二者的future关联起来,即_chain_future(future1, future2)
,最后把future2返回,供loop调度。分析到最后我忍不住感叹:吉多·范罗苏姆,你是一个天才选手。
3. all in 异步
import asyncio import time async def run1(): await asyncio.sleep(2) if __name__ == '__main__': loop = asyncio.get_event_loop() start = time.time() tasks = [] for i in range(20): tasks.append(run1()) loop.run_until_complete(asyncio.wait(tasks)) print("total time: ", time.time()-start)
运行结果如下:
total time: 2.0061399936676025
看到了吗?同样的代码逻辑,只要你把同步改成异步,那么效率会大大提升,即使有20个任务,每个任务sleep 2s,协程照样照单全收,几乎就是2s把所有任务执行完成,这效率无敌啊。
4. 小结
相信大家看到这里还有一点疑惑,到底哪种情况全是异步,那种情况同步+异步呢?来吧,开始你的表演:新项目,全是异步; 旧项目,看注释,说别动就别动,你动,就是bug,不动,万无一失,该如何破?教你一招:大胆破局。