scrapy_redis分布式实现了一套自己的组件,其中也提供了Redis数据存储的数据管道,位于scrapy_redis.pipelines,这篇文章主要分析器源码及其工作流程,源码如下:
from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults
default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):
"""Pushes serialized item into a redis list/queue
Settings
--------
REDIS_ITEMS_KEY : str
Redis key where to store items.
REDIS_ITEMS_SERIALIZER : str
Object path to serializer function.
"""
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
"""Initialize pipeline.
Parameters
----------
server : StrictRedis
Redis client instance.
key : str
Redis key where to store items.
serialize_func : callable
Items serializer function.
"""
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}
关于scrapy自定义数据管道在此前文章已经说过详见《scrapy中数据处理的两个模块:Item Pipeline与Exporter》,本篇文章阐述RedisPipeline的实现。
Redis类初始化参数,server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize,其中第一个参数sever是Redis客户端的实例、key是scrapy_defaults默认的配置格式如下:
import redis
# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
PIPELINE_KEY = '%(spider)s:items'
REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': REDIS_ENCODING,
}
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False
serialize_func是序列化Item的函数默认是json.dumps。
那我们对其工作原理就有了大致的认识,pipeline初始化传入一个Redis客户端、一个key、一个序列化函数;继续向下看,from_settings、from_crawler都是读取setting.py文件,完成初始化,核心函数process_item在下。
process_item源码为:
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}
process_主要实现了一个deferToThread方法,该方法作用是返回一个deferred对象,不过回调函数在另一个线程处理,主要用于数据库/文件读取操作。继续看deferToThread
def deferToThread(f, *args, **kwargs):
"""
Run a function in a thread and return the result as a Deferred.
@param f: The function to call.
@param *args: positional arguments to pass to f.
@param **kwargs: keyword arguments to pass to f.
@return: A Deferred which fires a callback with the result of f,
or an errback with a L{twisted.python.failure.Failure} if f throws
an exception.
"""
from twisted.internet import reactor
return deferToThreadPool(reactor, reactor.getThreadPool(),
f, *args, **kwargs)
主要使用twisted.internet的 reactor模式,反应堆(reactor)模式,这种模式在单线程环境中调度多个事件源产生的事件到它们各自的事件处理例程中去,在这里实现一个线程池的效果,达到最后异步写入的效果。