【Python爬虫4】并发并行下载

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 1一百万个网站1用普通方法解析Alexa列表2复用爬虫代码解析Alexa列表2串行爬虫3并发并行爬虫0并发并行工作原理1多线程爬虫2多进程爬虫4性能对比这篇将介绍使用多线程和多进程这两种方式并发并行下载网页,并将它们与串行下载的性能进行比较。

这篇将介绍使用多线程和多进程这两种方式并发并行下载网页,并将它们与串行下载的性能进行比较。

1一百万个网站

亚马逊子公司Alexa提供了最受欢迎的100万个网站列表(http://www.alexa.com/topsites ),我们也可以通过http://s3.amazonaws.com/alexa-static/top-1m.csv.zip 直接下载这一列表的压缩文件,这样就不用去提取Alexa网站的数据了。

排名 域名
1 google.com
2 youtube.com
3 facebook.com
4 baidu.com
5 yahoo.com
6 wikipedia.com
7 google.co.in
8 amazon.com
9 qq.com
10 google.co.jp
11 live.com
12 taobao.com

1.1用普通方法解析Alexa列表

提取数据的4个步骤:
- 下载.zip文件;
- 从.zip文件中提取出CSV文件;
- 解析CSV文件;
- 遍历CSV文件中的每一行,从中提取出域名数据。

# -*- coding: utf-8 -*-

import csv
from zipfile import ZipFile
from StringIO import StringIO
from downloader import Downloader

def alexa():
    D = Downloader()
    zipped_data = D('http://s3.amazonaws.com/alexa-static/top-1m.csv.zip')
    urls = [] # top 1 million URL's will be stored in this list
    with ZipFile(StringIO(zipped_data)) as zf:
        csv_filename = zf.namelist()[0]
        for _, website in csv.reader(zf.open(csv_filename)):
            urls.append('http://' + website)
    return urls

if __name__ == '__main__':
    print len(alexa())

下载得到的压缩数据是使用StringIO封装之后,才传给ZipFile,是因为ZipFile需要一个相关的接口,而不是字符串。由于这个zip文件只包含一个文件,所以直接选择第一个文件即可。然后在域名数据前添加http://协议,附加到URL列表中。

1.2复用爬虫代码解析Alexa列表

要复用上述功能,需要修改scrape_callback接口。

# -*- coding: utf-8 -*-

import csv
from zipfile import ZipFile
from StringIO import StringIO
from mongo_cache import MongoCache

class AlexaCallback:
    def __init__(self, max_urls=1000):
        self.max_urls = max_urls
        self.seed_url = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'

    def __call__(self, url, html):
        if url == self.seed_url:
            urls = []
            #cache = MongoCache()
            with ZipFile(StringIO(html)) as zf:
                csv_filename = zf.namelist()[0]
                for _, website in csv.reader(zf.open(csv_filename)):
                    if 'http://' + website not in cache:
                        urls.append('http://' + website)
                        if len(urls) == self.max_urls:
                            break
            return urls

这里添加了一个新的输入参数max_urls,用于设定从Alexa文件中提取的URL数量。如果真要下载100万个网页,那要消耗11天的时间,所以这里只设置为1000个URL。

2串行爬虫

# -*- coding: utf-8 -*-

from link_crawler import link_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback

def main():
    scrape_callback = AlexaCallback()
    cache = MongoCache()
    #cache.clear()
    link_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, timeout=10, ignore_robots=True)

if __name__ == '__main__':
    main()

time python ...

3并发并行爬虫

为了加快下载网页速度,我们用多进程和多线程将串行下载扩展成并发下载,并将delay标识最小时间间隔为1秒,以免造成服务器过载,或导致IP地址封禁。

3.0并发并行工作原理

并行是基于多处理器多核而言的,让多个处理器多核真正同时跑多个程序或多个进程。而并发是单个处理器而言的,同一时刻每个处理器只会执行一个进程,然后在不同进程间快速切换,宏观上给人以多个程序同时运行的感觉,但微观上单个处理器还是串行工作的。同理,在一个进程中,程序的执行也是不同线程间进行切换的,每个线程执行程序的的不同部分。这就意味着当一个线程等待网页下载时,进程可以切换到其他线程执行,避免浪费处理器时间。因此,为了充分利用计算机中的所有资源尽可能快地下载数据,我们需要将下载分发到多个进程和线程中。

3.1多线程爬虫

我们可以修改第一篇文章链接爬虫队列结构的代码,修改为多个线程中启动爬虫循环process_queue(),以便并发下载这些链接。

import time
import threading
import urlparse
from downloader import Downloader

SLEEP_TIME = 1

def threaded_crawler(seed_url, delay=5, cache=None, scrape_callback=None, user_agent='Wu_Being', proxies=None, num_retries=1, max_threads=10, timeout=60):
    """Crawl this website in multiple threads
    """
    # the queue of URL's that still need to be crawled
    #crawl_queue = Queue.deque([seed_url])
    crawl_queue = [seed_url]
    # the URL's that have been seen 
    seen = set([seed_url])
    D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)

    def process_queue():
        while True:
            try:
                url = crawl_queue.pop()
            except IndexError:
                # crawl queue is empty
                break
            else:
                html = D(url)
                if scrape_callback:
                    try:
                        links = scrape_callback(url, html) or []
                    except Exception as e:
                        print 'Error in callback for: {}: {}'.format(url, e)
                    else:
                        for link in links:
                            link = normalize(seed_url, link)
                            # check whether already crawled this link
                            if link not in seen:
                                seen.add(link)
                                # add this new link to queue
                                crawl_queue.append(link)

    # wait for all download threads to finish
    threads = []
    while threads or crawl_queue:
        # the crawl is still active
        for thread in threads:
            if not thread.is_alive():
                # remove the stopped threads
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue:
            # can start some more threads
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c
            thread.start()
            threads.append(thread)
        # all threads have been processed
        # sleep temporarily so CPU can focus execution on other threads
        time.sleep(SLEEP_TIME)

def normalize(seed_url, link):
    """Normalize this URL by removing hash and adding domain
    """
    link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates
    return urlparse.urljoin(seed_url, link)

上面代码在循环会不断创建线程,直到达到线程池threads的最大值。在爬取过程中,如果当前列队没有更多可以爬取的URL时,该线程会提前停止。
例如当前有两个线程以及两个待下载的URL,当第一个线程完成下载时,待爬取队列为空,则该线程退出。第二个线程稍后也完成了下载,但又发现了另一个待下载的URL。此时,thread循环注意到还有URL需要下载,并且线程数未达到最大值,因些创建一个新的下载线程。

# -*- coding: utf-8 -*-

import sys
from threaded_crawler import threaded_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback

def main(max_threads):
    scrape_callback = AlexaCallback()
    cache = MongoCache()
    #cache.clear()
    threaded_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10)

if __name__ == '__main__':
    max_threads = int(sys.argv[1])
    main(max_threads)

$time python 3threaded_test.py 5
上面使用了5个线程,因此下载速度几乎是串行版本的5倍。

3.2多进程爬虫

对于有多核的中央处理器,则可以启动多进程。

# -*- coding: utf-8 -*-
import sys
from process_crawler import process_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback

def main(max_threads):
    scrape_callback = AlexaCallback()
    cache = MongoCache()
    cache.clear()
    process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10) ##process_crawler

if __name__ == '__main__':
    max_threads = int(sys.argv[1])
    main(max_threads)

下面代码首先获取中央处理器内核个数,然后启动相应的进程个数,在每进程启动多个线程爬虫。之前的爬虫队列是存储在本地内存中,其他进程都无法处理这一爬虫,为了解决这一问题,需要把爬虫队列转移到MongoDB当中。单独存储队列,意味着即使是不同服务器上的爬虫也能够协同处理同一个爬虫任务。我们可以使用更加健壮的队列,比如专用的消息传输工具Celery,这里我们利用MongoDB实现的队列代码。在threaded_crawler需要做如下修改:
- 内建的队列换成基于MongoDB的新队列MongoQueue
- 由于队列内部实现中处理重复URL的问题,因此不再需要seen变量;
- 在URL处理结束后调用complete()方法,用于记录该URL已经被成功解析。

import time
import urlparse
import threading
import multiprocessing
from mongo_cache import MongoCache
from mongo_queue import MongoQueue
from downloader import Downloader

SLEEP_TIME = 1

### process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10)
def process_crawler(args, **kwargs):    #args:number of args, kwargs:args list
    num_cpus = multiprocessing.cpu_count()
    #pool = multiprocessing.Pool(processes=num_cpus)
    print 'Starting {} processes...'.format(num_cpus)   ######################
    processes = []
    for i in range(num_cpus):
        p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs)### threaded_crawler
        #parsed = pool.apply_async(threaded_link_crawler, args, kwargs)
        p.start()
        processes.append(p)
    # wait for processes to complete
    for p in processes:
        p.join()

def threaded_crawler(seed_url, delay=5, cache=None, scrape_callback=None, user_agent='wu_being', proxies=None, num_retries=1, max_threads=10, timeout=60):
    """Crawl using multiple threads
    """
    # the queue of URL's that still need to be crawled
    crawl_queue = MongoQueue()  ######################
    crawl_queue.clear()     ######################
    crawl_queue.push(seed_url)  ######################
    D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)

    def process_queue():
        while True:
            # keep track that are processing url
            try:
                url = crawl_queue.pop() ######################
            except KeyError:
                # currently no urls to process
                break
            else:
                html = D(url)
                if scrape_callback:
                    try:
                        links = scrape_callback(url, html) or []
                    except Exception as e:
                        print 'Error in callback for: {}: {}'.format(url, e)
                    else:
                        for link in links:      #############
                            # add this new link to queue######################
                            crawl_queue.push(normalize(seed_url, link))######################
                crawl_queue.complete(url)       ######################

    # wait for all download threads to finish
    threads = []
    while threads or crawl_queue:           ######################
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue.peek():    #######################
            # can start some more threads
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c
            thread.start()
            threads.append(thread)
        time.sleep(SLEEP_TIME)

def normalize(seed_url, link):
    """Normalize this URL by removing hash and adding domain
    """
    link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates
    return urlparse.urljoin(seed_url, link)

MongoQueue定义了三种状态:
- OUTSTANDING:添加一人新URL时;
- PROCESSING:队列中取出准备下载时;
- COMPLETE:完成下载时。

由于大部分线程都在从队列准备取出未完成处理的URL,比如处理的URL线程被终止的情况。所以在该类中使用了timeout参数,默认为300秒。在repaire()方法中,如果某个URL的处理时间超过了这个timeout值,我们就认定处理过程出现了错误,URL的状态将被重新设为OUTSTANDING,以便再次处理。

from datetime import datetime, timedelta
from pymongo import MongoClient, errors

class MongoQueue:
    """
    >>> timeout = 1
    >>> url = 'http://example.webscraping.com'
    >>> q = MongoQueue(timeout=timeout)
    >>> q.clear() # ensure empty queue
    >>> q.push(url) # add test URL
    >>> q.peek() == q.pop() == url # pop back this URL
    True
    >>> q.repair() # immediate repair will do nothin
    >>> q.pop() # another pop should be empty
    >>> q.peek() 
    >>> import time; time.sleep(timeout) # wait for timeout
    >>> q.repair() # now repair will release URL
    Released: test
    >>> q.pop() == url # pop URL again
    True
    >>> bool(q) # queue is still active while outstanding
    True
    >>> q.complete(url) # complete this URL
    >>> bool(q) # queue is not complete
    False
    """

    # possible states of a download
    OUTSTANDING, PROCESSING, COMPLETE = range(3)

    def __init__(self, client=None, timeout=300):
        """
        host: the host to connect to MongoDB
        port: the port to connect to MongoDB
        timeout: the number of seconds to allow for a timeout
        """
        self.client = MongoClient() if client is None else client
        self.db = self.client.cache
        self.timeout = timeout

    def __nonzero__(self):
        """Returns True if there are more jobs to process
        """
        record = self.db.crawl_queue.find_one(
            {'status': {'$ne': self.COMPLETE}} 
        )
        return True if record else False

    def push(self, url):
        """Add new URL to queue if does not exist
        """
        try:
            self.db.crawl_queue.insert({'_id': url, 'status': self.OUTSTANDING})
        except errors.DuplicateKeyError as e:
            pass # this is already in the queue

    def pop(self):
        """Get an outstanding URL from the queue and set its status to processing.
        If the queue is empty a KeyError exception is raised.
        """
        record = self.db.crawl_queue.find_and_modify(
            query={'status': self.OUTSTANDING}, 
            update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
        )
        if record:
            return record['_id']
        else:
            self.repair()
            raise KeyError()

    def peek(self):
        record = self.db.crawl_queue.find_one({'status': self.OUTSTANDING})
        if record:
            return record['_id']

    def complete(self, url):
        self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

    def repair(self):
        """Release stalled jobs
        """
        record = self.db.crawl_queue.find_and_modify(
            query={
                'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
                'status': {'$ne': self.COMPLETE}
            },
            update={'$set': {'status': self.OUTSTANDING}}
        )
        if record:
            print 'Released:', record['_id']

    def clear(self):
        self.db.crawl_queue.drop()

4性能对比

脚本 线程数 进程数 时间 与串行时间比
串行 1 1
多线程 5 1
多线程 10 1
多线程 20 1
多进程 5 2
多进程 10 2
多进程 20 2

此外,下载的带宽是有限的,最终添加新线程将无法加快的下载速度。因此要想获得更好性能的爬虫,就需要在多台服务器上分布式部署爬虫,并且所有服务器都要指向同一个MongoDB队列实例中。

Wu_Being 博客声明:本人博客欢迎转载,请标明博客原文和原链接!谢谢!
【Python爬虫系列】《【Python爬虫4】并发并行下载》http://blog.csdn.net/u014134180/article/details/55506994
Python爬虫系列的GitHub代码文件https://github.com/1040003585/WebScrapingWithPython

Wu_Being 吴兵博客接受赞助费二维码

如果你看完这篇博文,觉得对你有帮助,并且愿意付赞助费,那么我会更有动力写下去。

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
目录
相关文章
|
26天前
|
数据采集 存储 XML
Python爬虫定义入门知识
Python爬虫是用于自动化抓取互联网数据的程序。其基本概念包括爬虫、请求、响应和解析。常用库有Requests、BeautifulSoup、Scrapy和Selenium。工作流程包括发送请求、接收响应、解析数据和存储数据。注意事项包括遵守Robots协议、避免过度请求、处理异常和确保数据合法性。Python爬虫强大而灵活,但使用时需遵守法律法规。
|
27天前
|
数据采集 缓存 定位技术
网络延迟对Python爬虫速度的影响分析
网络延迟对Python爬虫速度的影响分析
|
28天前
|
数据采集 Web App开发 监控
高效爬取B站评论:Python爬虫的最佳实践
高效爬取B站评论:Python爬虫的最佳实践
|
29天前
|
API 数据处理 Python
探秘Python并发新世界:asyncio库,让你的代码并发更优雅!
在Python编程中,随着网络应用和数据处理需求的增长,并发编程变得愈发重要。asyncio库作为Python 3.4及以上版本的标准库,以其简洁的API和强大的异步编程能力,成为提升性能和优化资源利用的关键工具。本文介绍了asyncio的基本概念、异步函数的定义与使用、并发控制和资源管理等核心功能,通过具体示例展示了如何高效地编写并发代码。
33 2
|
29天前
|
数据采集 存储 JSON
Python爬虫开发中的分析与方案制定
Python爬虫开发中的分析与方案制定
|
1月前
|
数据采集 Web App开发 JavaScript
爬虫策略规避:Python爬虫的浏览器自动化
爬虫策略规避:Python爬虫的浏览器自动化
|
20天前
|
数据采集 JavaScript 程序员
探索CSDN博客数据:使用Python爬虫技术
本文介绍了如何利用Python的requests和pyquery库爬取CSDN博客数据,包括环境准备、代码解析及注意事项,适合初学者学习。
59 0
|
1月前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
88 6
|
4月前
|
机器学习/深度学习 数据采集 数据可视化
基于爬虫和机器学习的招聘数据分析与可视化系统,python django框架,前端bootstrap,机器学习有八种带有可视化大屏和后台
本文介绍了一个基于Python Django框架和Bootstrap前端技术,集成了机器学习算法和数据可视化的招聘数据分析与可视化系统,该系统通过爬虫技术获取职位信息,并使用多种机器学习模型进行薪资预测、职位匹配和趋势分析,提供了一个直观的可视化大屏和后台管理系统,以优化招聘策略并提升决策质量。
206 4
|
4月前
|
数据采集 存储 搜索推荐
打造个性化网页爬虫:从零开始的Python教程
【8月更文挑战第31天】在数字信息的海洋中,网页爬虫是一艘能够自动搜集网络数据的神奇船只。本文将引导你启航,用Python语言建造属于你自己的网页爬虫。我们将一起探索如何从无到有,一步步构建一个能够抓取、解析并存储网页数据的基础爬虫。文章不仅分享代码,更带你理解背后的逻辑,让你能在遇到问题时自行找到解决方案。无论你是编程新手还是有一定基础的开发者,这篇文章都会为你打开一扇通往数据世界的新窗。