代码详解Python多线程、多进程、协程-阿里云开发者社区

开发者社区> 开发与运维> 正文

代码详解Python多线程、多进程、协程

简介: 本文就通过代码讲解如何使用多进程、多线程、协程来提升爬取速度。注意:我们不深入介绍理论和原理,一切都在代码中。

云栖号资讯:【点击查看更多行业资讯
在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来!

一、前言

很多时候我们写了一个爬虫,实现了需求后会发现了很多值得改进的地方,其中很重要的一点就是爬取速度。本文就通过代码讲解如何使用多进程、多线程、协程来提升爬取速度。注意:我们不深入介绍理论和原理,一切都在代码中。

image

二、同步

首先我们写一个简化的爬虫,对各个功能细分,有意识进行函数式编程。下面代码的目的是访问300次百度页面并返回状态码,其中parse_1函数可以设定循环次数,每次循环将当前循环数(从0开始)和url传入parse_2函数。

import requests 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    for i in range(300): 
        parse_2(url) 
 
def parse_2(url): 
    response = requests.get(url) 
    print(response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待

示例代码就是典型的串行逻辑,parse_1将url和循环数传递给parse_2,parse_2请求并返回状态码后parse_1继续迭代一次,重复之前步骤

三、多线程

因为CPU在执行程序时每个时间刻度上只会存在一个线程,因此多线程实际上提高了进程的使用率从而提高了CPU的使用率

实现多线程的库有很多,这里用concurrent.futures中的ThreadPoolExecutor来演示。介绍ThreadPoolExecutor库是因为它相比其他库代码更简洁

为了方便说明问题,下面代码中如果是新增加的部分,代码行前会加上 > 符号便于观察说明问题,实际运行需要去掉

import requests 
> from concurrent.futures import ThreadPoolExecutor 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    # 建立线程池 
    > pool = ThreadPoolExecutor(6) 
    for i in range(300): 
        > pool.submit(parse_2, url) 
    > pool.shutdown(wait=True) 
 
def parse_2(url): 
    response = requests.get(url) 
    print(response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

跟同步相对的就是异步。异步就是彼此独立,在等待某事件的过程中继续做自己的事,不需要等待这一事件完成后再工作。线程就是实现异步的一个方式,也就是说多线程是异步处理异步就意味着不知道处理结果,有时候我们需要了解处理结果,就可以采用回调

import requests 
from concurrent.futures import ThreadPoolExecutor 
 
# 增加回调函数 
> def callback(future): 
    > print(future.result()) 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    pool = ThreadPoolExecutor(6) 
    for i in range(300): 
        > results = pool.submit(parse_2, url) 
        # 回调的关键步骤 
        > results.add_done_callback(callback) 
    pool.shutdown(wait=True) 
 
def parse_2(url): 
    response = requests.get(url) 
    print(response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

Python实现多线程有一个无数人诟病的GIL(全局解释器锁),但多线程对于爬取网页这种多数属于IO密集型的任务依旧很合适。

四、多进程

多进程用两个方法实现:ProcessPoolExecutor和multiprocessing

1. ProcessPoolExecutor

和实现多线程的ThreadPoolExecutor类似

import requests 
> from concurrent.futures import ProcessPoolExecutor 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    # 建立线程池 
    > pool = ProcessPoolExecutor(6) 
    for i in range(300): 
        > pool.submit(parse_2, url) 
    > pool.shutdown(wait=True) 
 
def parse_2(url): 
    response = requests.get(url) 
    print(response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

可以看到改动了两次类名,代码依旧很简洁,同理也可以添加回调函数

import requests 
from concurrent.futures import ProcessPoolExecutor 
 
> def callback(future): 
    > print(future.result()) 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    pool = ProcessPoolExecutor(6) 
    for i in range(300): 
        > results = pool.submit(parse_2, url) 
        > results.add_done_callback(callback) 
    pool.shutdown(wait=True) 
 
def parse_2(url): 
    response = requests.get(url) 
    print(response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

2. multiprocessing

直接看代码,一切都在注释中。

import requests 
> from multiprocessing import Pool 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    # 建池 
    > pool = Pool(processes=5) 
    # 存放结果 
    > res_lst = [] 
    for i in range(300): 
        # 把任务加入池中 
        > res = pool.apply_async(func=parse_2, args=(url,)) 
        # 获取完成的结果(需要取出) 
        > res_lst.append(res) 
    # 存放最终结果(也可以直接存储或者print) 
    > good_res_lst = [] 
    > for res in res_lst: 
        # 利用get获取处理后的结果 
        > good_res = res.get() 
        # 判断结果的好坏 
        > if good_res: 
            > good_res_lst.append(good_res) 
    # 关闭和等待完成 
    > pool.close() 
    > pool.join() 
 
def parse_2(url): 
    response = requests.get(url) 
    print(response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

可以看到multiprocessing库的代码稍繁琐,但支持更多的拓展。多进程和多线程确实能够达到加速的目的,但如果遇到IO阻塞会出现线程或者进程的浪费,因此有一个更好的方法……

五、异步非阻塞

协程+回调配合动态协作就可以达到异步非阻塞的目的,本质只用了一个线程,所以很大程度利用了资源

实现异步非阻塞经典是利用asyncio库+yield,为了方便利用逐渐出现了更上层的封装 aiohttp,要想更好的理解异步非阻塞最好还是深入了解asyncio库。而gevent是一个非常方便实现协程的库

import requests 
> from gevent import monkey 
# 猴子补丁是协作运行的灵魂 
> monkey.patch_all() 
> import gevent 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    # 建立任务列表 
    > tasks_list = [] 
    for i in range(300): 
        > task = gevent.spawn(parse_2, url) 
        > tasks_list.append(task) 
    > gevent.joinall(tasks_list) 
 
def parse_2(url): 
    response = requests.get(url) 
    print(response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

gevent能很大提速,也引入了新的问题:如果我们不想速度太快给服务器造成太大负担怎么办?如果是多进程多线程的建池方法,可以控制池内数量。如果用gevent想要控制速度也有一个不错的方法:建立队列。gevent中也提供了Quene类,下面代码改动较大

import requests 
from gevent import monkey 
monkey.patch_all() 
import gevent 
> from gevent.queue import Queue 
 
def parse_1(): 
    url = 'https://www.baidu.com' 
    tasks_list = [] 
    # 实例化队列 
    > quene = Queue() 
    for i in range(300): 
        # 全部url压入队列 
        > quene.put_nowait(url) 
    # 两路队列 
    > for _ in range(2): 
        > task = gevent.spawn(parse_2) 
        > tasks_list.append(task) 
    gevent.joinall(tasks_list) 
 
# 不需要传入参数,都在队列中 
> def parse_2(): 
    # 循环判断队列是否为空 
    > while not quene.empty(): 
        # 弹出队列 
        > url = quene.get_nowait() 
        response = requests.get(url) 
        # 判断队列状态 
        > print(quene.qsize(), response.status_code) 
 
if __name__ == '__main__': 
    parse_1()

结束语

以上就是几种常用的加速方法。如果对代码测试感兴趣可以利用time模块判断运行时间。爬虫的加速是重要技能,但适当控制速度也是爬虫工作者的良好习惯,不要给服务器太大压力,拜拜~

【云栖号在线课堂】每天都有产品技术专家分享!
课程地址:https://yqh.aliyun.com/zhibo

立即加入社群,与专家面对面,及时了解课程最新动态!
【云栖号在线课堂 社群】https://c.tb.cn/F3.Z8gvnK

原文发布时间:2020-04-07
本文作者:陈熹
本文来自:“早起Python”,了解相关信息可以关注“早起Python

版权声明:本文首发在云栖社区,遵循云栖社区版权声明:本文内容由互联网用户自发贡献,版权归用户作者所有,云栖社区不为本文内容承担相关法律责任。云栖社区已升级为阿里云开发者社区。如果您发现本文中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,阿里云开发者社区将协助删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章