RedisSpider的调度队列实现过程及其源码

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 对于非分布式的scrapy爬虫而言,不能共享爬虫队列,不能实现分布式。RedisSpider是依赖Redis存储中介,来实现多台主机多爬虫之间的通信,RedisSpider是去重是内部的queue.py文件实现的,内部实现了队列、堆栈、优先级队列,在调度的统一协调下最终实现分布式协同工作。

对于非分布式的scrapy爬虫而言,不能共享爬虫队列,不能实现分布式。RedisSpider是依赖Redis存储中介,来实现多台主机多爬虫之间的通信,RedisSpider是去重是内部的queue.py文件实现的,内部实现了队列、堆栈、优先级队列,在调度的统一协调下最终实现分布式协同工作。

queue.py的源码

from scrapy.utils.reqser import request_to_dict, request_from_dict

from . import picklecompat


class Base(object):
    """Per-spider base queue class"""

    def __init__(self, server, spider, key, serializer=None):
        """Initialize per-spider redis queue.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        spider : Spider
            Scrapy spider instance.
        key: str
            Redis key where to put and get messages.
        serializer : object
            Serializer object with ``loads`` and ``dumps`` methods.

        """
        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: %r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                            % serializer)

        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)


class FifoQueue(Base):
    """Per-spider FIFO queue"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)


class PriorityQueue(Base):
    """Per-spider priority queue abstraction using redis' sorted set"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        # We don't use zadd method as the order of arguments change depending on
        # whether the class is Redis or StrictRedis, and the option of using
        # kwargs only accepts strings, not bytes.
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """
        Pop a request
        timeout not support in this queue class
        """
        # use atomic range/remove using multi/exec
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])


class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)


# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue


其中一个基类声明有那些接口,实现了三个子类分别是SpiderQueue、SpiderStack、SpiderPriorityQueue ,并在该文件引入了序列化方法picklecompat用来处理数据存储中的兼容编码问题。

三个之类分别实现先进先出、先进后出、优先级先出的Request调度方法其中三种方式储存Rquest对象方法如下:

SpiderQueue:lpush(self.key, self._encode_request(request)),即按照key:序列化Request方式储存

SpiderStack:lpush(self.key, self._encode_request(request)),存储方式同上,不同的是获取request对象是先进后出

SpiderPriorityQueue:execute_command('ZADD', self.key, score, data),按照key、优先级、序列化Request对象方式存储,实现优先级的调度。

当在多台主机上运行相同的爬虫,self.key = key % {'spider': spider.name}爬虫名相同就决定了同一个爬虫的调度队列是相同的key,进而实现了不同主机爬虫之间的分布式。

关于分布式爬虫,此前说过三种架构思想,这里验证了RedisSpider的架构思想《三种分布式爬虫系统的架构方式》感兴趣的可用其他两种方式实现。


v2-e5b5956e15e037c402743a5be1d7d12a_hd.j
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
5月前
|
数据可视化 测试技术 API
阅读源码有哪些好方式与好步骤
阅读源码是理解软件工作原理的关键。首先,了解背景、目的和技术栈。从文件头部的文档注释开始,逐步深入到复杂代码。利用Git、调试器和分析工具辅助理解。保持批判性思维,质疑代码设计并验证理解。拆分代码块,画图展示结构,使用版本控制追踪变更。搜索、阅读文档、API和单元测试以深化理解。参与讨论,做笔记,回顾历史版本,了解上下文,并通过实践加强领悟。每个人的方法可能不同,关键是持续学习和适应。
49 1
|
5月前
|
搜索推荐 编译器 开发者
应用程序的运行:原理、过程与代码实践
应用程序的运行:原理、过程与代码实践
153 1
Sub过程
参数表是用来指明调用该Sub过程时需要传递给该过程的参数及类型。表内的参数称为形参。Sub过程可以没有形参(但小括号不可以省略),也可1到多个形参(多个之间用逗号隔开);
Sub过程
一次有趣的学习过程
嗨!大家好,我是小蚂蚁。 今天这篇文章记录的是一次有趣的学习过程,起因是昨天我在学员群里发了这样的一条信息。 这原本是一位学员的需求,想要在游戏中做一个文字逐个出现的打字机效果,因为我暂时还没有写与此相关的教程(也没有考虑过如何实现),所以就把这个问题抛了出来,顺便看一下大家的想法。 没想到的是这下可热闹了,下面记录的就是关于这次有趣的学习的全过程。
82 0
|
弹性计算 Linux 网络安全
学习过程中的Q&A
macbook以及linux的操作
94 0
|
NoSQL Ubuntu MongoDB
使用过程心得
一些常用操作和常见问题
使用过程心得