RedisSpider的调度队列实现过程及其源码

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 对于非分布式的scrapy爬虫而言,不能共享爬虫队列,不能实现分布式。RedisSpider是依赖Redis存储中介,来实现多台主机多爬虫之间的通信,RedisSpider是去重是内部的queue.py文件实现的,内部实现了队列、堆栈、优先级队列,在调度的统一协调下最终实现分布式协同工作。

对于非分布式的scrapy爬虫而言,不能共享爬虫队列,不能实现分布式。RedisSpider是依赖Redis存储中介,来实现多台主机多爬虫之间的通信,RedisSpider是去重是内部的queue.py文件实现的,内部实现了队列、堆栈、优先级队列,在调度的统一协调下最终实现分布式协同工作。

queue.py的源码

from scrapy.utils.reqser import request_to_dict, request_from_dict

from . import picklecompat


class Base(object):
    """Per-spider base queue class"""

    def __init__(self, server, spider, key, serializer=None):
        """Initialize per-spider redis queue.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        spider : Spider
            Scrapy spider instance.
        key: str
            Redis key where to put and get messages.
        serializer : object
            Serializer object with ``loads`` and ``dumps`` methods.

        """
        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: %r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                            % serializer)

        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)


class FifoQueue(Base):
    """Per-spider FIFO queue"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)


class PriorityQueue(Base):
    """Per-spider priority queue abstraction using redis' sorted set"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        # We don't use zadd method as the order of arguments change depending on
        # whether the class is Redis or StrictRedis, and the option of using
        # kwargs only accepts strings, not bytes.
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """
        Pop a request
        timeout not support in this queue class
        """
        # use atomic range/remove using multi/exec
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])


class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)


# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue


其中一个基类声明有那些接口,实现了三个子类分别是SpiderQueue、SpiderStack、SpiderPriorityQueue ,并在该文件引入了序列化方法picklecompat用来处理数据存储中的兼容编码问题。

三个之类分别实现先进先出、先进后出、优先级先出的Request调度方法其中三种方式储存Rquest对象方法如下:

SpiderQueue:lpush(self.key, self._encode_request(request)),即按照key:序列化Request方式储存

SpiderStack:lpush(self.key, self._encode_request(request)),存储方式同上,不同的是获取request对象是先进后出

SpiderPriorityQueue:execute_command('ZADD', self.key, score, data),按照key、优先级、序列化Request对象方式存储,实现优先级的调度。

当在多台主机上运行相同的爬虫,self.key = key % {'spider': spider.name}爬虫名相同就决定了同一个爬虫的调度队列是相同的key,进而实现了不同主机爬虫之间的分布式。

关于分布式爬虫,此前说过三种架构思想,这里验证了RedisSpider的架构思想《三种分布式爬虫系统的架构方式》感兴趣的可用其他两种方式实现。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
76 12
|
5月前
|
存储 Java 编译器
经验总结:源代码-目标代码的区别
源代码是由程序员用高级语言编写的可读文本文件,需编译成机器可执行的二进制目标代码。目标代码由编译器生成,包含机器指令,对机器可读但对人类不易理解。源代码便于修改,而目标代码需重新编译以反映更改。源代码不受系统限制,目标代码则特定于系统。此外,链接程序处理源文件间及库函数的引用,将目标文件连接成可执行文件。Java中的本地方法则允许调用非Java语言编写的代码,实现与底层系统的交互,提高程序性能或实现特定功能。
264 4
|
9月前
|
数据可视化 测试技术 API
阅读源码有哪些好方式与好步骤
阅读源码是理解软件工作原理的关键。首先,了解背景、目的和技术栈。从文件头部的文档注释开始,逐步深入到复杂代码。利用Git、调试器和分析工具辅助理解。保持批判性思维,质疑代码设计并验证理解。拆分代码块,画图展示结构,使用版本控制追踪变更。搜索、阅读文档、API和单元测试以深化理解。参与讨论,做笔记,回顾历史版本,了解上下文,并通过实践加强领悟。每个人的方法可能不同,关键是持续学习和适应。
83 1
|
9月前
|
搜索推荐 编译器 开发者
应用程序的运行:原理、过程与代码实践
应用程序的运行:原理、过程与代码实践
270 1
|
JavaScript Java 关系型数据库
小菜鸟初始java项目出错解决思路
java中的第一个项目SSH完成了,并且找师傅验收了,在实现功能的时候,出现了许多的问题,那么对于一个刚接触java刚开始使用myeclipse软件的小菜鸟来说,针对这些问题如何寻找思路很重要,下面就将小编的一些经历以及总结经验分享给大家。
|
9月前
|
Java
【小技巧】复制一个模块到你的工程(学习阶段很实用)
【小技巧】复制一个模块到你的工程(学习阶段很实用)
|
前端开发 计算机视觉 Python
代码报错还好说,源码报错才难搞!分享自己源码报错的解决过程!
代码报错还好说,源码报错才难搞!分享自己源码报错的解决过程!
152 0
代码报错还好说,源码报错才难搞!分享自己源码报错的解决过程!