scrapy中数据处理的两个模块:ItemPipeline与Exporter

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: scrapy提供了如题两个模块来扩展我们的数据处理方式,其中Item Pipeline功能有数据清洗、效验、过滤、存库的作用,Exporter用于扩展scrapy导出数据的格式。Item Pipelineitem pipeline在scrapy项目文件下的pipeline.

scrapy提供了如题两个模块来扩展我们的数据处理方式,其中Item Pipeline功能有数据清洗、效验、过滤、存库的作用,Exporter用于扩展scrapy导出数据的格式。


Item Pipeline

item pipeline在scrapy项目文件下的pipeline.py文件中,pipeline类不需要继承特定的基类,只需要实现特定的方法如:

open_spider:爬虫运行前执行的操作

process_item:爬虫获取到的每项item数据的处理方法

close_spider:爬虫运行结束时执行的操作

from_crawler:pipeline类方法,是创建item pipeline的回调方法,通常该方法用于读取setting中的配置参数。

其中process_item实现process_item(Item, Spider)固定方法,并return Item给后面的Pipeline处理或导出数据,但在处理中如果遇到错误,会抛弃该数据并停止传递。

import pymongo

class MongoDBPipeline(object):
    """
    1、连接数据库操作
    """

    def __init__(self,mongourl,mongoport,mongodb):
        '''
        初始化mongodb数据的url、端口号、数据库名称
        :param mongourl:
        :param mongoport:
        :param mongodb:
        '''

        self.mongourl = mongourl
        self.mongoport = mongoport
        self.mongodb = mongodb

    @classmethod
    def from_crawler(cls,crawler):
        """
        1、读取settings里面的mongodb数据的url、port、DB。
        :param crawler:
        :return:
        """

        return cls(
            mongourl = crawler.settings.get("MONGO_URL"),
            mongoport = crawler.settings.get("MONGO_PORT"),
            mongodb = crawler.settings.get("MONGO_DB")
        )

    def open_spider(self,spider):
        '''
        1、连接mongodb数据
        :param spider:
        :return:
        '''

        self.client = pymongo.MongoClient(self.mongourl,self.mongoport)
        self.db = self.client[self.mongodb]

    def process_item(self,item,spider):
        '''
        1、将数据写入数据库
        :param item:
        :param spider:
        :return:
        '''

        name = item.__class__.__name__
        self.db['user'].update({'url_token':item['url_token']},{'$set':item},True)
        return item

    def close_spider(self,spider):
        '''
        1、关闭数据库连接
        :param spider:
        :return:
        '''

        self.client.close()


做完这些最重要的是在setting.py文件中启用写得中间件,在配置文件中添加:

ITEM_PIPELINES = {
   'objectname.pipelines.MongoDBPipeline': 300,
}


Exporter

Exporter是scrapy自带的数据导出器,内置6中数据导出格式:json、json lines、CSV、xml、pickle、marshal。

但是在实际应用中,我们还需要一种最常用格式xlsx,我们从源码出发来编写这样一个导出器。

导出器的工作过程:

首先是在CMD命令运行spider crawl spidername -o test.json命令

该条命令指定了运行的爬虫名字和通过参数-o指定导出器,导出器根据保存文件后缀来判断具体是什么导出器。

当引擎得到导出器类型后会从两个地方查找是否存在该导出器,一个是FEED_EXPORTERS_BASE(内置在scrapy.settings.default_settings)、一个是FEED_EXPORTERS(setting.py配置文件中)

FEED_EXPORTERS_BASE = {
    'json''scrapy.exporters.JsonItemExporter',
    'jsonlines''scrapy.exporters.JsonLinesItemExporter',
    'jl''scrapy.exporters.JsonLinesItemExporter',
    'csv''scrapy.exporters.CsvItemExporter',
    'xml''scrapy.exporters.XmlItemExporter',
    'marshal''scrapy.exporters.MarshalItemExporter',
    'pickle''scrapy.exporters.PickleItemExporter',
}


查看exporter源码:

"""
Item Exporters are used to export/serialize items into different formats.
"
""

import csv
import io
import sys
import pprint
import marshal
import six
from six.moves import cPickle as pickle
from xml.sax.saxutils import XMLGenerator

from scrapy.utils.serialize import ScrapyJSONEncoder
from scrapy.utils.python import to_bytes, to_unicode, to_native_str, is_listlike
from scrapy.item import BaseItem
from scrapy.exceptions import ScrapyDeprecationWarning
import warnings


__all__ = ['BaseItemExporter''PprintItemExporter''PickleItemExporter',
           'CsvItemExporter''XmlItemExporter''JsonLinesItemExporter',
           'JsonItemExporter''MarshalItemExporter']


class BaseItemExporter(object):

    def __init__(self, **kwargs):
        self._configure(kwargs)

    def _configure(self, options, dont_fail=False):
        """Configure the exporter by poping options from the ``options`` dict.
        If dont_fail is set, it won't raise an exception on unexpected options
        (useful for using with keyword arguments in subclasses constructors)
        "
""
        self.encoding = options.pop('encoding', None)
        self.fields_to_export = options.pop('fields_to_export', None)
        self.export_empty_fields = options.pop('export_empty_fields', False)
        self.indent = options.pop('indent', None)
        if not dont_fail and options:
            raise TypeError("Unexpected options: %s" % ', '.join(options.keys()))

    def export_item(self, item):
        raise NotImplementedError

    def serialize_field(self, field, name, value):
        serializer = field.get('serializer', lambda x: x)
        return serializer(value)

    def start_exporting(self):
        pass

    def finish_exporting(self):
        pass

    def _get_serialized_fields(self, item, default_value=None, include_empty=None):
        """Return the fields to export as an iterable of tuples
        (name, serialized_value)
        "
""
        if include_empty is None:
            include_empty = self.export_empty_fields
        if self.fields_to_export is None:
            if include_empty and not isinstance(item, dict):
                field_iter = six.iterkeys(item.fields)
            else:
                field_iter = six.iterkeys(item)
        else:
            if include_empty:
                field_iter = self.fields_to_export
            else:
                field_iter = (x for x in self.fields_to_export if x in item)

        for field_name in field_iter:
            if field_name in item:
                field = {} if isinstance(item, dict) else item.fields[field_name]
                value = self.serialize_field(field, field_name, item[field_name])
            else:
                value = default_value

            yield field_name, value


class JsonLinesItemExporter(BaseItemExporter):

    def __init__(self, file, **kwargs):
        self._configure(kwargs, dont_fail=True)
        self.file = file
        kwargs.setdefault('ensure_ascii'not self.encoding)
        self.encoder = ScrapyJSONEncoder(**kwargs)

    def export_item(self, item):
        itemdict = dict(self._get_serialized_fields(item))
        data = self.encoder.encode(itemdict) + '\n'
        self.file.write(to_bytes(data, self.encoding))


class JsonItemExporter(BaseItemExporter):
····
class XmlItemExporter(BaseItemExporter):
····

class CsvItemExporter(BaseItemExporter):
·····

class PickleItemExporter(BaseItemExporter):····

class MarshalItemExporter(BaseItemExporter):
····

class PprintItemExporter(BaseItemExporter):
····
   
class PythonItemExporter(BaseItemExporter):
    """The idea behind this exporter is to have a mechanism to serialize items
    to built-in python types so any serialization library (like
    json, msgpack, binc, etc) can be used on top of it. Its main goal is to
    seamless support what BaseItemExporter does plus nested items.
    "
""
····


其中定义了一个基类,指明了几个特定方法:

start_exporting:导出开始时被调用,用于初始化,类似pipelines的open_spider

finish_exporting:导出完成后调用,用于收尾工作类似pipelines的close_spider

export_item:用于处理每项数据,也就是主程序,类似pipelines的process_item,是必须实现的方法


查看jsonlines的源码,仅实现了export_item实现了一个换行写入每项item数据功能,我们就按照这样的接口方式来实现自定义的excel格式导出器。

# -*- coding: utf-8 -*-from scrapy.exporters import BaseItemExporterimport xlwtclass ExcelItemExporter(BaseItemExporter):
   """
   导出为Excel
   在执行命令中指定输出格式为excel
   e.g. scrapy crawl -t excel -o books.xls
   """


   def __init__(self, file, **kwargs):
       self._configure(kwargs)
       self.file = file
       self.wbook = xlwt.Workbook(encoding='utf-8')
       self.wsheet = self.wbook.add_sheet('scrapy')
       self._headers_not_written = True
       self.fields_to_export = list()
       self.row = 0

   def finish_exporting(self):
       self.wbook.save(self.file)    def export_item(self, item):
       if self._headers_not_written:
           self._headers_not_written = False
           self._write_headers_and_set_fields_to_export(item)

       fields = self._get_serialized_fields(item)        for col, v in enumerate(x for _, x in fields):
           print(self.row, col, str(v))
           self.wsheet.write(self.row, col, str(v))
       self.row += 1

   def _write_headers_and_set_fields_to_export(self, item):
       if not self.fields_to_export:            if isinstance(item, dict):
               self.fields_to_export = list(item.keys())            else:
               self.fields_to_export = list(item.fields.keys())        for column, v in enumerate(self.fields_to_export):
           self.wsheet.write(self.row, column, v)
       self.row += 1

上面源码来自简书(https://www.jianshu.com/p/a50b19b6258d)

有了自定义的导出器,我们还需要在setting.py中添加FEED_EXPORTERS

FEED_EXPORTERS={'excel':'Book.my_exporters.ExcelItemExporter'}

到此这个自定义的导出器就可用了。

说了内置导出器和自定义导出器,那么怎么使用这些导出器呢?


大致可以通过三种方式来使用:

命令行:scrapy crawl spidername -o text.json

-o参数会根据传入文件名后缀来确定选择哪种导出器

命令行:scrapy crawl spidername -t json -o test.json

使用参数-o -t其中-t用于指定导出器,在自定义导出器中可使用该命令

直接在配置文件中配置导出器相关属性:

1.FEED_URI:导出文件路径

FEED_URI='export_data\%(name)s.data'
2.FEED_FORMAT:导出文件的格式

FEED_FORMAT='csv'
3.FEED_EXPORT_ENCODING:导出文件的编码格式(默认情况下,json使用数字编码,其他格式使用'utf-8'编码)

FEED_EXPORT_ENCODING='gbk'
4.FEED_EXPORT_FIELDS:默认导出全部字段,对字段进行排序:

FEED_EXPORT_FIELDS=['name','author','price']

其中关于导出文件路径中的%(name)s、%(time)s,这是两个特定字符会被自动替换成spider的名字和文件创建的时间。

到此关于scrapy中的两种数据处理方式介绍完毕,更多内容可参阅官方文档。


2019-03-12-17_18_15.png



·END·
 

Python之战
相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。   相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
数据采集 Web App开发 JSON
Python爬虫:关于scrapy模块的请求头
Python爬虫:关于scrapy模块的请求头
518 0
Python爬虫:关于scrapy模块的请求头
|
中间件 Python
一日一技:从 Scrapy 学习模块导入技巧
一日一技:从 Scrapy 学习模块导入技巧
126 0
5、web爬虫,scrapy模块,解决重复ur——自动递归url
一般抓取过的url不重复抓取,那么就需要记录url,判断当前URL如果在记录里说明已经抓取过了,如果不存在说明没抓取过 记录url可以是缓存,或者数据库,如果保存数据库按照以下方式: i...
1987 0
4、web爬虫,scrapy模块标签选择器下载图片,以及正则匹配标签
标签选择器对象 HtmlXPathSelector()创建标签选择器对象,参数接收response回调的html对象需要导入模块:from scrapy.
874 0
|
数据采集 存储 中间件
3、web爬虫,scrapy模块介绍与使用
Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架。
1249 0
|
XML 数据采集 Linux
2、web爬虫,scrapy模块以及相关依赖模块安装
当前环境python3.5 ,windows10系统 Linux系统安装 在线安装,会自动安装scrapy模块以及相关依赖模块
1006 0
|
数据采集 API Python
scrapy爬虫加载API,配置自定义加载模块
当我们在scrapy中写了几个爬虫程序之后,他们是怎么被检索出来的,又是怎么被加载的?这就涉及到爬虫加载的API,今天我们就来分享爬虫加载过程及其自定义加载程序。 SpiderLoader API  该API是爬虫实例化API,主要实现一个类SpiderLoader class scrapy.loader.SpiderLoader 该类负责检索和处理项目中定义的spider类。
1334 0
|
4月前
|
数据采集 存储 数据处理
Scrapy:Python网络爬虫框架的利器
在当今信息时代,网络数据已成为企业和个人获取信息的重要途径。而Python网络爬虫框架Scrapy则成为了网络爬虫工程师的必备工具。本文将介绍Scrapy的概念与实践,以及其在数据采集和处理过程中的应用。
47 1
|
4月前
|
数据采集 中间件 Python
Scrapy爬虫:利用代理服务器爬取热门网站数据
Scrapy爬虫:利用代理服务器爬取热门网站数据
|
12天前
|
数据采集 中间件 调度
Scrapy 爬虫框架的基本使用
Scrapy 爬虫框架的基本使用