scrapy中scrapy_redis分布式内置pipeline源码及其工作原理

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: scrapy_redis分布式实现了一套自己的组件,其中也提供了Redis数据存储的数据管道,位于scrapy_redis.pipelines,这篇文章主要分析器源码及其工作流程,源码如下:from scrapy.

scrapy_redis分布式实现了一套自己的组件,其中也提供了Redis数据存储的数据管道,位于scrapy_redis.pipelines,这篇文章主要分析器源码及其工作流程,源码如下:

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from . import connection, defaults


default_serialize = ScrapyJSONEncoder().encode


class RedisPipeline(object):
    """Pushes serialized item into a redis list/queue

    Settings
    --------
    REDIS_ITEMS_KEY : str
        Redis key where to store items.
    REDIS_ITEMS_SERIALIZER : str
        Object path to serializer function.

    """


    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize)
:

        """Initialize pipeline.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        key : str
            Redis key where to store items.
        serialize_func : callable
            Items serializer function.

        """

        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )

        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        """

        return self.key % {'spider': spider.name}


关于scrapy自定义数据管道在此前文章已经说过详见《scrapy中数据处理的两个模块:Item Pipeline与Exporter》,本篇文章阐述RedisPipeline的实现。

Redis类初始化参数,server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize,其中第一个参数sever是Redis客户端的实例、key是scrapy_defaults默认的配置格式如下:

import redis


# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

PIPELINE_KEY = '%(spider)s:items'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False

serialize_func是序列化Item的函数默认是json.dumps。

那我们对其工作原理就有了大致的认识,pipeline初始化传入一个Redis客户端、一个key、一个序列化函数;继续向下看,from_settings、from_crawler都是读取setting.py文件,完成初始化,核心函数process_item在下。

process_item源码为:

 def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        "
""
        return self.key % {'spider': spider.name}


process_主要实现了一个deferToThread方法,该方法作用是返回一个deferred对象,不过回调函数在另一个线程处理,主要用于数据库/文件读取操作。继续看deferToThread

def deferToThread(f, *args, **kwargs):
    """
    Run a function in a thread and return the result as a Deferred.

    @param f: The function to call.
    @param *args: positional arguments to pass to f.
    @param **kwargs: keyword arguments to pass to f.

    @return: A Deferred which fires a callback with the result of f,
    or an errback with a L{twisted.python.failure.Failure} if f throws
    an exception.
    """

    from twisted.internet import reactor
    return deferToThreadPool(reactor, reactor.getThreadPool(),
                             f, *args, **kwargs)


主要使用twisted.internet的 reactor模式,反应堆(reactor)模式,这种模式在单线程环境中调度多个事件源产生的事件到它们各自的事件处理例程中去,在这里实现一个线程池的效果,达到最后异步写入的效果。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
126 5
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
76 8
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
64 16
|
2月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
47 5
|
2月前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
125 6
|
3月前
|
数据采集 中间件 开发者
Scrapy爬虫框架-自定义中间件
Scrapy爬虫框架-自定义中间件
71 1
|
3月前
|
数据采集 中间件 Python
Scrapy爬虫框架-通过Cookies模拟自动登录
Scrapy爬虫框架-通过Cookies模拟自动登录
137 0
|
2月前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
103 4
|
2月前
|
数据采集 中间件 API
在Scrapy爬虫中应用Crawlera进行反爬虫策略
在Scrapy爬虫中应用Crawlera进行反爬虫策略
|
8月前
|
数据采集 中间件 Python
Scrapy爬虫:利用代理服务器爬取热门网站数据
Scrapy爬虫:利用代理服务器爬取热门网站数据