基于 I/O 的多线程
多线程的例子中比较多的就是抓取网页,因为抓取网页是典型的 I/O 开销,因此 Python 的多线程终于不显得那么鸡肋了。
我们把上面例子中的计算函数修改为抓取网站的大小。先用最标准的方式,不用线程。
# 标准方式抓取
>>> from time import time
>>> import requests
>>> list_url = ['http://www.qq.com', 'http://chuangyiji.com',
... 'http://taobao.com', 'http://mingrihui.com']
>>> def get_url_size(url):
... rq = requests.get(url)
... length = len(rq.content)
... print(url, length)
... return length
>>> start = time()
>>> for url in list_url:
... get_url_size(url)
>>> end = time()
>>> print('\ncost time {:f} s'.format(end - start))
http://www.qq.com 246846
http://chuangyiji.com 84537
http://taobao.com 123926
http://mingrihui.com 43480
cost time 11.283091 s
我在里面故意放了两个自己的网站(你没见过的那两个域名就是),一个网站是在国外,一个网站在国内的云主机上,相对访问速度比较慢,因此在执行程序的时候有时候会有明显的等待。对四个网站处理完,差不多要20秒左右或者更多。大家可以看到结果呈现的顺序是和列表中一样的。这是一个单线程的例子。
然后我们修改为上面多线程的模式,程序逻辑几乎一模一样。
# 多线程方式执行网站大小抓取
>>> from time import time
>>> import requests
>>> import threading
>>> list_url = ['http://www.qq.com', 'http://chuangyiji.com',
... 'http://taobao.com', 'http://mingrihui.com']
>>> def get_url_size(url):
... rq = requests.get(url)
... length = len(rq.content)
... print( url, length)
... return length
>>> start = time()
>>> Threads = []
>>> for url in list_url:
... thread = threading.Thread(target=get_url_size, args=(url,))
... thread.start()
... Threads.append(thread)
>>> for thread in Threads:
... thread.join()
>>> end = time()
>>> print('\ncost time {:f} s'.format(end - start))
http://www.qq.com 246836
http://mingrihui.com 43480
http://taobao.com 123926
http://chuangyiji.com 84537
cost time 5.828597 s
可以看到总的来说执行速度快了很多,并且通过显示的网页大小结果,我们会发现,和上面的顺序不一定一样(我自己测试了很多次都不一样),QQ 和淘宝都很快,在国内云上第三,在国外的网站最后,基本都是这个顺序。
毋庸多言,这就是多线程比较适用在非堵塞业务场景的证明。
Python 3.2 开始新增了 concurrent.futures 模块,提供了一种优雅的方式来完成多线程或者多进程的并发实现,我们先"野蛮"一点,使用多进程方式来实现这个功能。
# 多进程方式执行网站大小抓取
>>> from time import time
>>> import requests
>>> import concurrent.futures
>>> list_url = ['http://www.qq.com', 'http://chuangyiji.com',
... 'http://taobao.com', 'http://mingrihui.com']
>>> def get_url_size(url):
... rq = requests.get(url)
... length = len(rq.content)
... print(url, length)
... return length
>>> start = time()
>>> pool = concurrent.futures.ProcessPoolExecutor(max_workers=6)
>>> list_result = list(pool.map(get_url_size, list_url))
>>> end = time()
>>> print('\ncost time {:f} s'.format(end - start))
http://www.qq.com 246793
http://mingrihui.com 43480
http://chuangyiji.com 84537
http://taobao.com 123918
cost time 8.208078 s
你会发现和前面 CPU 密集运算的例子不同,使用多进程方式并没有提高太多,慢的网站你给它一个单独核心,还是要等待,多线程切换时候,等待的时间我们就已经可以先抓其他的了。
# 多进程方式执行网站大小抓取
# executor 写法
>>> from time import time
>>> import requests
>>> import concurrent.futures
>>> list_url = ['http://www.qq.com', 'http://chuangyiji.com',
... 'http://taobao.com', 'http://mingrihui.com']
>>> def get_url_size(url):
... rq = requests.get(url)
... length = len(rq.content)
... print(url, length)
... return length
>>> start = time()
>>> with concurrent.futures.ProcessPoolExecutor(max_workers=6)
... as executor:
... # 关键是 submit 方法
... future = {executor.submit(get_url_size, url): url for url
... in list_url}
>>> end = time()
>>> print('\ncost time {:f} s'.format(end - start))
http://www.qq.com 246819
http://taobao.com 123918
http://mingrihui.com 43480
http://chuangyiji.com 84537
cost time 4.587851 s
concurrent.futures 模块提供了高级的接口对于异步方式进行执行调用,异步执行可以通过线程池,或者独立的进程池。通过抽象的 Executor 类对于两种调用方式有一致的接口。这样对于我们来说,不管是多线程还是多进程,在代码层面都可以方便的切换。
下面的写法是参照了 Python 3 的官方文档,通过线程池来实现多线程的抓取,所以我把网站 url 增加到8个,通过6个 worker 的线程池来抓取。
# 执行网站大小抓取
# 使用线程池方式
>>> from time import time
>>> import concurrent.futures
>>> import requests
>>> list_url = ['http://www.qq.com',
... 'http://chuangyiji.com',
... 'http://taobao.com',
... 'http://www.sohu.com',
... 'http://www.163.com',
... 'http://www.sina.com.cn',
... 'http://www.baidu.com',
... 'http://mingrihui.com']
>>> def get_url_size(url):
... rq = requests.get(url)
... length = len(rq.content)
... return length
>>> start = time()
# 设置了线程池中 worker
>>> with concurrent.futures.ThreadPoolExecutor(max_workers=6)
... as executor:
... future_to_url = {executor.submit(get_url_size, url): url for
... url in list_url}
... for future in concurrent.futures.as_completed(future_to_url):
... url = future_to_url[future]
... try:
... data = future.result()
... except Exception as exc:
... print('%r generated an exception: %s' % (url, exc))
... else:
... print('%r page is %d bytes' % (url, data))
>>> end = time()
>>> print('\ncost time {:f} s'.format(end - start))
'http://www.sohu.com' page is 184564 bytes
'http://www.baidu.com' page is 2381 bytes
'http://www.qq.com' page is 246819 bytes
'http://mingrihui.com' page is 43480 bytes
'http://www.163.com' page is 660241 bytes
'http://www.sina.com.cn' page is 601949 bytes
'http://chuangyiji.com' page is 84537 bytes
'http://taobao.com' page is 123986 bytes
cost time 12.424974 s
小结
Python 的线程在 GIL 的控制之下,线程之间,对整个 Python 解释器,对 Python 提供的 CAPI 的访问,都是互斥的,这可以看作 Python 内核级的互斥机制,这种互斥是我们不能控制的,这样就保护了共享资源。
Python 语言的确是比较讲究简洁以及人类化,有些编程语言的设计为了性能或者独特,使得学习曲线比较陡峭,Python 的解释器有了 GIL 之后会容易实现一些,当然单价就是性能会有影响。(很长一段时间内,Python 没有那么流行就是因为性能的问题,但是随着服务器性能最近几年的直线上升,Python 性能在大多数应用场景下已经不是问题了)
在传统进程、线程之后协程的概念继续发展,异步的操作也使得诸如 Sanic 这样的 Web 框架在性能指标上超过了这几年在整个领域非常厉害的 go 语言,也将 Flask 等"传统"的 Web 框架甩开几乎数量级的差距。
摘自本人与同事所著《Python 机器学习实战》一书