scrapy分布式Spider源码分析及实现过程

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 分布式框架scrapy_redis实现了一套完整的组件,其中也实现了spider,RedisSpider是在继承原scrapy的Spider的基础上略有改动,初始URL不在从start_urls列表中读取,而是从redis起始队列中读取。

分布式框架scrapy_redis实现了一套完整的组件,其中也实现了spider,RedisSpider是在继承原scrapy的Spider的基础上略有改动,初始URL不在从start_urls列表中读取,而是从redis起始队列中读取。

scrapy_redis源码在scrapy.redis.spider中,不仅实现了RedisSpider(分布式爬虫)还实现了RedisCrawlSpider(分布式深度爬虫)的逻辑,不过二者很多方法是一致的。

源码如下:

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider

from . import connection


# Default batch size matches default concurrent requests setting.
DEFAULT_START_URLS_BATCH_SIZE = 16
DEFAULT_START_URLS_KEY = '%(name)s:start_urls'


class RedisMixin(object):
    """Mixin class to implement reading urls from a redis queue."""
    # Per spider redis key, default to DEFAULT_START_URLS_KEY.
    redis_key = None
    # Fetch this amount of start urls when idle. Default to DEFAULT_START_URLS_BATCH_SIZE.
    redis_batch_size = None
    # Redis client instance.
    server = None

    def start_requests(self):
        """Returns a batch of start requests from redis."""
        return self.next_requests()

    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.
        This should be called after the spider has set its crawler object.
        """

        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler'None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_batch_size is None:
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s)", self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET')
        fetch_one = self.server.spop if use_set else self.server.lpop
        XXX: Do we need to use a timeout here?
        found = 0
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

    def make_request_from_data(self, data):
        # By default, data is an URL.
        if '://' in data:
            return self.make_requests_from_url(data)
        else:
            self.logger.error("Unexpected URL from '%s': %r", self.redis_key, data)

    def schedule_next_requests(self):
        """Schedules a request if available"""
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider


class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


开始start_requests方法是从Redis中获取所有连接,然后与URL去重列表比对去重,不被去重的通过make_requests_from_url(url)传递给引擎,然后到调度器,这里和一般的爬虫思路是一致的。

当引擎收到任务队列为空的消息后会调用Spider_idle方法,进而调用schedule_next_requests迭代获取到的起始URL递交给引擎,然后就开始任务,直到任务完成后在次调用起始队列中的URL。

    def schedule_next_requests(self):
        """Schedules a request if available"""
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider


RedisSpider和RedisCrawlSpider很简单,继承了Spider和RedisMiXine,实现了setup_redis方法,该方法会根据不同的crawler初始化setting、redis_key,及通过connect接口,给spider绑定了spider_idle信号绑定。

class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj

 def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.
        This should be called after the spider has set its crawler object.
        """

        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler'None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_batch_size is None:
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s)", self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)


总结:RedisSpider从初始Redis队列中获取start_url生成Request,然后递交给scrapy引擎交给对应的Redis scheduler调度,完成去重和入队列。当调度条件满足是队列中的Request交由引擎给下载器,生成Response对象递交给Spider解析,产生新的Request,继续抓取;直到没有新的Request产生后,Spider由从起始队列中去获取URL,继续上一轮循环,当然如果队列中没有了,那么爬虫就结束了。

2019-03-14-21_29_02.png

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
8月前
|
数据采集 XML 搜索推荐
聚焦Python分布式爬虫必学框架Scrapy打造搜索引擎
聚焦Python分布式爬虫必学框架Scrapy打造搜索引擎
|
消息中间件 算法 网络协议
49-微服务技术栈(高级):分布式协调服务zookeeper源码篇(选举机制源码分析)
本篇博文详细分析了FastLeaderElection的算法,其是ZooKeeper的核心部分,结合前面的理论学习部分,可以比较轻松的理解其具体过程。
101 0
|
数据采集 存储 云安全
在阿里云Centos7.6上面部署基于redis的分布式爬虫scrapy-redis
Scrapy是一个比较好用的Python爬虫框架,你只需要编写几个组件就可以实现网页数据的爬取。但是当我们要爬取的页面非常多的时候,单个服务器的处理能力就不能满足我们的需求了(无论是处理速度还是网络请求的并发数),这时候分布式爬虫的优势就显现出来。 而Scrapy-Redis则是一个基于Redis的Scrapy分布式组件。它利用Redis对用于爬取的请求(Requests)进行存储和调度(Schedule),并对爬取产生的项目(items)存储以供后续处理使用。scrapy-redi重写了scrapy一些比较关键的代码,将scrapy变成一个可以在多个主机上同时运行的分布式爬虫。
在阿里云Centos7.6上面部署基于redis的分布式爬虫scrapy-redis
|
NoSQL Java Redis
Springboot基于Redisson实现Redis分布式可重入锁【案例到源码分析】
Springboot基于Redisson实现Redis分布式可重入锁【案例到源码分析】
193 0
Springboot基于Redisson实现Redis分布式可重入锁【案例到源码分析】
|
数据采集 Python
配置Pycharm的Scrapy爬虫Spider子类通用模板
配置Pycharm的Scrapy爬虫Spider子类通用模板
190 0
|
数据采集 Python
Python爬虫:scrapy框架Spider类参数设置
Python爬虫:scrapy框架Spider类参数设置
110 0
Python爬虫:scrapy框架Spider类参数设置
|
数据采集 NoSQL Redis
python爬虫:scrapy-redis实现分布式爬虫
python爬虫:scrapy-redis实现分布式爬虫
156 0
|
数据采集 Python
Python爬虫:scrapy框架Spider类参数设置
Python爬虫:scrapy框架Spider类参数设置
180 0
|
数据采集 存储 Web App开发
阿里云Centos7.6上面部署基于redis的分布式爬虫scrapy-redis将任务队列push进redis
Scrapy是一个比较好用的Python爬虫框架,你只需要编写几个组件就可以实现网页数据的爬取。
995 0
阿里云Centos7.6上面部署基于redis的分布式爬虫scrapy-redis将任务队列push进redis
|
数据采集 搜索推荐 Python
24、Python快速开发分布式搜索引擎Scrapy精讲—爬虫和反爬的对抗过程以及策略—scrapy架构源码分析图
【百度云搜索:http://www.lqkweb.com】 【搜网盘:http://www.swpan.cn】 1、基本概念 2、反爬虫的目的 3、爬虫和反爬的对抗过程以及策略 scrapy架构源码分析图
5987 0

热门文章

最新文章