scrapy提供了如题两个模块来扩展我们的数据处理方式,其中Item Pipeline功能有数据清洗、效验、过滤、存库的作用,Exporter用于扩展scrapy导出数据的格式。
Item Pipeline
item pipeline在scrapy项目文件下的pipeline.py文件中,pipeline类不需要继承特定的基类,只需要实现特定的方法如:
open_spider:爬虫运行前执行的操作
process_item:爬虫获取到的每项item数据的处理方法
close_spider:爬虫运行结束时执行的操作
from_crawler:pipeline类方法,是创建item pipeline的回调方法,通常该方法用于读取setting中的配置参数。
其中process_item实现process_item(Item, Spider)固定方法,并return Item给后面的Pipeline处理或导出数据,但在处理中如果遇到错误,会抛弃该数据并停止传递。
import pymongo
class MongoDBPipeline(object):
"""
1、连接数据库操作
"""
def __init__(self,mongourl,mongoport,mongodb):
'''
初始化mongodb数据的url、端口号、数据库名称
:param mongourl:
:param mongoport:
:param mongodb:
'''
self.mongourl = mongourl
self.mongoport = mongoport
self.mongodb = mongodb
@classmethod
def from_crawler(cls,crawler):
"""
1、读取settings里面的mongodb数据的url、port、DB。
:param crawler:
:return:
"""
return cls(
mongourl = crawler.settings.get("MONGO_URL"),
mongoport = crawler.settings.get("MONGO_PORT"),
mongodb = crawler.settings.get("MONGO_DB")
)
def open_spider(self,spider):
'''
1、连接mongodb数据
:param spider:
:return:
'''
self.client = pymongo.MongoClient(self.mongourl,self.mongoport)
self.db = self.client[self.mongodb]
def process_item(self,item,spider):
'''
1、将数据写入数据库
:param item:
:param spider:
:return:
'''
name = item.__class__.__name__
self.db['user'].update({'url_token':item['url_token']},{'$set':item},True)
return item
def close_spider(self,spider):
'''
1、关闭数据库连接
:param spider:
:return:
'''
self.client.close()
做完这些最重要的是在setting.py文件中启用写得中间件,在配置文件中添加:
ITEM_PIPELINES = {
'objectname.pipelines.MongoDBPipeline': 300,
}
Exporter
Exporter是scrapy自带的数据导出器,内置6中数据导出格式:json、json lines、CSV、xml、pickle、marshal。
但是在实际应用中,我们还需要一种最常用格式xlsx,我们从源码出发来编写这样一个导出器。
首先是在CMD命令运行spider crawl spidername -o test.json命令
该条命令指定了运行的爬虫名字和通过参数-o指定导出器,导出器根据保存文件后缀来判断具体是什么导出器。
当引擎得到导出器类型后会从两个地方查找是否存在该导出器,一个是FEED_EXPORTERS_BASE(内置在scrapy.settings.default_settings)、一个是FEED_EXPORTERS(setting.py配置文件中)
FEED_EXPORTERS_BASE = {
'json': 'scrapy.exporters.JsonItemExporter',
'jsonlines': 'scrapy.exporters.JsonLinesItemExporter',
'jl': 'scrapy.exporters.JsonLinesItemExporter',
'csv': 'scrapy.exporters.CsvItemExporter',
'xml': 'scrapy.exporters.XmlItemExporter',
'marshal': 'scrapy.exporters.MarshalItemExporter',
'pickle': 'scrapy.exporters.PickleItemExporter',
}
查看exporter源码:
"""
Item Exporters are used to export/serialize items into different formats.
"""
import csv
import io
import sys
import pprint
import marshal
import six
from six.moves import cPickle as pickle
from xml.sax.saxutils import XMLGenerator
from scrapy.utils.serialize import ScrapyJSONEncoder
from scrapy.utils.python import to_bytes, to_unicode, to_native_str, is_listlike
from scrapy.item import BaseItem
from scrapy.exceptions import ScrapyDeprecationWarning
import warnings
__all__ = ['BaseItemExporter', 'PprintItemExporter', 'PickleItemExporter',
'CsvItemExporter', 'XmlItemExporter', 'JsonLinesItemExporter',
'JsonItemExporter', 'MarshalItemExporter']
class BaseItemExporter(object):
def __init__(self, **kwargs):
self._configure(kwargs)
def _configure(self, options, dont_fail=False):
"""Configure the exporter by poping options from the ``options`` dict.
If dont_fail is set, it won't raise an exception on unexpected options
(useful for using with keyword arguments in subclasses constructors)
"""
self.encoding = options.pop('encoding', None)
self.fields_to_export = options.pop('fields_to_export', None)
self.export_empty_fields = options.pop('export_empty_fields', False)
self.indent = options.pop('indent', None)
if not dont_fail and options:
raise TypeError("Unexpected options: %s" % ', '.join(options.keys()))
def export_item(self, item):
raise NotImplementedError
def serialize_field(self, field, name, value):
serializer = field.get('serializer', lambda x: x)
return serializer(value)
def start_exporting(self):
pass
def finish_exporting(self):
pass
def _get_serialized_fields(self, item, default_value=None, include_empty=None):
"""Return the fields to export as an iterable of tuples
(name, serialized_value)
"""
if include_empty is None:
include_empty = self.export_empty_fields
if self.fields_to_export is None:
if include_empty and not isinstance(item, dict):
field_iter = six.iterkeys(item.fields)
else:
field_iter = six.iterkeys(item)
else:
if include_empty:
field_iter = self.fields_to_export
else:
field_iter = (x for x in self.fields_to_export if x in item)
for field_name in field_iter:
if field_name in item:
field = {} if isinstance(item, dict) else item.fields[field_name]
value = self.serialize_field(field, field_name, item[field_name])
else:
value = default_value
yield field_name, value
class JsonLinesItemExporter(BaseItemExporter):
def __init__(self, file, **kwargs):
self._configure(kwargs, dont_fail=True)
self.file = file
kwargs.setdefault('ensure_ascii', not self.encoding)
self.encoder = ScrapyJSONEncoder(**kwargs)
def export_item(self, item):
itemdict = dict(self._get_serialized_fields(item))
data = self.encoder.encode(itemdict) + '\n'
self.file.write(to_bytes(data, self.encoding))
class JsonItemExporter(BaseItemExporter):
····
class XmlItemExporter(BaseItemExporter):
····
class CsvItemExporter(BaseItemExporter):
·····
class PickleItemExporter(BaseItemExporter):
····
class MarshalItemExporter(BaseItemExporter):
····
class PprintItemExporter(BaseItemExporter):
····
class PythonItemExporter(BaseItemExporter):
"""The idea behind this exporter is to have a mechanism to serialize items
to built-in python types so any serialization library (like
json, msgpack, binc, etc) can be used on top of it. Its main goal is to
seamless support what BaseItemExporter does plus nested items.
"""
····
其中定义了一个基类,指明了几个特定方法:
start_exporting:导出开始时被调用,用于初始化,类似pipelines的open_spider
finish_exporting:导出完成后调用,用于收尾工作类似pipelines的close_spider
export_item:用于处理每项数据,也就是主程序,类似pipelines的process_item,是必须实现的方法
查看jsonlines的源码,仅实现了export_item实现了一个换行写入每项item数据功能,我们就按照这样的接口方式来实现自定义的excel格式导出器。
# -*- coding: utf-8 -*-from scrapy.exporters import BaseItemExporterimport xlwtclass ExcelItemExporter(BaseItemExporter):
"""
导出为Excel
在执行命令中指定输出格式为excel
e.g. scrapy crawl -t excel -o books.xls
"""
def __init__(self, file, **kwargs):
self._configure(kwargs)
self.file = file
self.wbook = xlwt.Workbook(encoding='utf-8')
self.wsheet = self.wbook.add_sheet('scrapy')
self._headers_not_written = True
self.fields_to_export = list()
self.row = 0
def finish_exporting(self):
self.wbook.save(self.file) def export_item(self, item):
if self._headers_not_written:
self._headers_not_written = False
self._write_headers_and_set_fields_to_export(item)
fields = self._get_serialized_fields(item) for col, v in enumerate(x for _, x in fields):
print(self.row, col, str(v))
self.wsheet.write(self.row, col, str(v))
self.row += 1
def _write_headers_and_set_fields_to_export(self, item):
if not self.fields_to_export: if isinstance(item, dict):
self.fields_to_export = list(item.keys()) else:
self.fields_to_export = list(item.fields.keys()) for column, v in enumerate(self.fields_to_export):
self.wsheet.write(self.row, column, v)
self.row += 1
上面源码来自简书(https://www.jianshu.com/p/a50b19b6258d)
有了自定义的导出器,我们还需要在setting.py中添加FEED_EXPORTERS
FEED_EXPORTERS={'excel':'Book.my_exporters.ExcelItemExporter'}
到此这个自定义的导出器就可用了。
说了内置导出器和自定义导出器,那么怎么使用这些导出器呢?
命令行:scrapy crawl spidername -o text.json
-o参数会根据传入文件名后缀来确定选择哪种导出器
命令行:scrapy crawl spidername -t json -o test.json
使用参数-o -t其中-t用于指定导出器,在自定义导出器中可使用该命令
直接在配置文件中配置导出器相关属性:
1.FEED_URI:导出文件路径
FEED_URI='export_data\%(name)s.data'
2.FEED_FORMAT:导出文件的格式
FEED_FORMAT='csv'
3.FEED_EXPORT_ENCODING:导出文件的编码格式(默认情况下,json使用数字编码,其他格式使用'utf-8'编码)
FEED_EXPORT_ENCODING='gbk'
4.FEED_EXPORT_FIELDS:默认导出全部字段,对字段进行排序:
FEED_EXPORT_FIELDS=['name','author','price']
其中关于导出文件路径中的%(name)s、%(time)s,这是两个特定字符会被自动替换成spider的名字和文件创建的时间。
到此关于scrapy中的两种数据处理方式介绍完毕,更多内容可参阅官方文档。
·END·