scrapy分布式Spider源码分析及实现过程

简介: 分布式框架scrapy_redis实现了一套完整的组件,其中也实现了spider,RedisSpider是在继承原scrapy的Spider的基础上略有改动,初始URL不在从start_urls列表中读取,而是从redis起始队列中读取。

分布式框架scrapy_redis实现了一套完整的组件,其中也实现了spider,RedisSpider是在继承原scrapy的Spider的基础上略有改动,初始URL不在从start_urls列表中读取,而是从redis起始队列中读取。

scrapy_redis源码在scrapy.redis.spider中,不仅实现了RedisSpider(分布式爬虫)还实现了RedisCrawlSpider(分布式深度爬虫)的逻辑,不过二者很多方法是一致的。

源码如下:

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider

from . import connection


# Default batch size matches default concurrent requests setting.
DEFAULT_START_URLS_BATCH_SIZE = 16
DEFAULT_START_URLS_KEY = '%(name)s:start_urls'


class RedisMixin(object):
    """Mixin class to implement reading urls from a redis queue."""
    # Per spider redis key, default to DEFAULT_START_URLS_KEY.
    redis_key = None
    # Fetch this amount of start urls when idle. Default to DEFAULT_START_URLS_BATCH_SIZE.
    redis_batch_size = None
    # Redis client instance.
    server = None

    def start_requests(self):
        """Returns a batch of start requests from redis."""
        return self.next_requests()

    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.
        This should be called after the spider has set its crawler object.
        """

        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler'None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_batch_size is None:
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s)", self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET')
        fetch_one = self.server.spop if use_set else self.server.lpop
        XXX: Do we need to use a timeout here?
        found = 0
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

    def make_request_from_data(self, data):
        # By default, data is an URL.
        if '://' in data:
            return self.make_requests_from_url(data)
        else:
            self.logger.error("Unexpected URL from '%s': %r", self.redis_key, data)

    def schedule_next_requests(self):
        """Schedules a request if available"""
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider


class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


开始start_requests方法是从Redis中获取所有连接,然后与URL去重列表比对去重,不被去重的通过make_requests_from_url(url)传递给引擎,然后到调度器,这里和一般的爬虫思路是一致的。

当引擎收到任务队列为空的消息后会调用Spider_idle方法,进而调用schedule_next_requests迭代获取到的起始URL递交给引擎,然后就开始任务,直到任务完成后在次调用起始队列中的URL。

    def schedule_next_requests(self):
        """Schedules a request if available"""
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider


RedisSpider和RedisCrawlSpider很简单,继承了Spider和RedisMiXine,实现了setup_redis方法,该方法会根据不同的crawler初始化setting、redis_key,及通过connect接口,给spider绑定了spider_idle信号绑定。

class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj

 def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.
        This should be called after the spider has set its crawler object.
        """

        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler'None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_batch_size is None:
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s)", self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)


总结:RedisSpider从初始Redis队列中获取start_url生成Request,然后递交给scrapy引擎交给对应的Redis scheduler调度,完成去重和入队列。当调度条件满足是队列中的Request交由引擎给下载器,生成Response对象递交给Spider解析,产生新的Request,继续抓取;直到没有新的Request产生后,Spider由从起始队列中去获取URL,继续上一轮循环,当然如果队列中没有了,那么爬虫就结束了。

相关文章
|
SQL Java 数据库
Seata分布式事务源码分析
Seata分布式事务源码分析
444 0
|
存储 分布式计算 Java
JuiceFS分布式文件系统源码分析(Java层)
讲解了hadoop-common java api层面JuiceFS的实现流程
623 0
JuiceFS分布式文件系统源码分析(Java层)
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
1556 0
分布式爬虫框架Scrapy-Redis实战指南
|
11月前
|
数据采集 存储 NoSQL
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
773 67
|
数据采集 存储 NoSQL
Redis 与 Scrapy:无缝集成的分布式爬虫技术
Redis 与 Scrapy:无缝集成的分布式爬虫技术
|
数据采集 前端开发 中间件
python-scrapy框架(一)Spider文件夹的用法讲解
python-scrapy框架(一)Spider文件夹的用法讲解
528 0
|
存储 分布式计算 Java
JuiceFS分布式文件系统源码分析(Java层)
JuiceFS分布式文件系统源码分析(Java层)
284 0
|
消息中间件 算法 网络协议
49-微服务技术栈(高级):分布式协调服务zookeeper源码篇(选举机制源码分析)
本篇博文详细分析了FastLeaderElection的算法,其是ZooKeeper的核心部分,结合前面的理论学习部分,可以比较轻松的理解其具体过程。
329 0
|
数据采集 XML 搜索推荐
聚焦Python分布式爬虫必学框架Scrapy打造搜索引擎
聚焦Python分布式爬虫必学框架Scrapy打造搜索引擎
361 0
|
数据采集 存储 云安全
在阿里云Centos7.6上面部署基于redis的分布式爬虫scrapy-redis
Scrapy是一个比较好用的Python爬虫框架,你只需要编写几个组件就可以实现网页数据的爬取。但是当我们要爬取的页面非常多的时候,单个服务器的处理能力就不能满足我们的需求了(无论是处理速度还是网络请求的并发数),这时候分布式爬虫的优势就显现出来。 而Scrapy-Redis则是一个基于Redis的Scrapy分布式组件。它利用Redis对用于爬取的请求(Requests)进行存储和调度(Schedule),并对爬取产生的项目(items)存储以供后续处理使用。scrapy-redi重写了scrapy一些比较关键的代码,将scrapy变成一个可以在多个主机上同时运行的分布式爬虫。
在阿里云Centos7.6上面部署基于redis的分布式爬虫scrapy-redis
下一篇
开通oss服务