在如今互联网环境下,网络上的各种业务数据,如新闻,社交网站,交易,政府公开数据,气象数据等各种各样的数据越来越多被应用到企业的数据运营中, 以打通外部数据与内部数据的通道,使得两者激情碰撞出热烈的火花。这些数据一般都数据量巨大,是最适合用MaxCompute来进行分析和加工的一类数据,尤其可以利用MaxCompute的机器学习能力来完成一些数据挖掘的业务场景,本文就介绍如何利用开源的Scrapy爬虫框架来爬取新闻网站的数据到MaxCompute中。
一、 Scrapy简单介绍
Scrapy是一个用 Python 写的 Crawler Framework ,简单轻巧,并且非常方便。
Scrapy 使用 Twisted 这个异步网络库来处理网络通讯,架构清晰,并且包含了各种中间件接口,可以灵活的完成各种需求。整体架构如下图所示:
绿线是数据流向,首先从初始 URL 开始,Scheduler 会将其交给 Downloader 进行下载,下载之后会交给 Spider 进行分析,Spider 分析出来的结果有两种:一种是需要进一步抓取的链接,例如之前分析的“下一页”的链接,这些东西会被传回 Scheduler ;另一种是需要保存的数据,它们则被送到 Item Pipeline 那里,那是对数据进行后期处理(详细分析、过滤、存储等)的地方。另外,在数据流动的通道里还可以安装各种中间件,进行必要的处理。
二、Scrapy环境安装
系统环境要求:
Linux
软件环境要求:
- 已安装:Python 2.7 ( 下载地址:https://www.python.org/ftp/python/2.7.13/Python-2.7.13.tgz)
- 已安装:pip (可参考:https://pip.pypa.io/en/stable/installing/ 进行安装
Scrapy安装
执行安装命令:
pip install Scrapy
Scrapy校验
执行命令:
scrapy
执行结果:
ODPS Python安装
执行安装命令:
pip install pyodps
ODPS Python校验
执行命令:
python -c "from odps import ODPS"
执行结果:无报错,即为安装成功
三、 创建Scrapy项目
在你想要创建Scrapy项目的目录下,执行:
scrapy startproject hr_scrapy_demo
看一下Scrapy创建项目后的目录结构:
hr_scrapy_demo /
scrapy.cfg # 全局配置文件
hr_scrapy_demo / # 项目下的Python模块,你可以从这里引用该Python模块
__init__.py
items.py # 自定义的Items
pipelines.py # 自定义的Pipelines
settings.py # 自定义的项目级配置信息
spiders/ # 自定义的spiders
__init__.py
四、 创建OdpsPipelines
在hr_scrapy_demo/pipelines.py中,我们可以自定义我们的数据处理pipelines,以下是我之前编写好的一个OdpsPipeline,该Pipeline可以用于将我们采集到的item保存到ODPS中,但也有几点需要说明:
- ODPS中的表必须已经提前创建好。
- Spider中采集到的item必须包含该表的所有字段,且名字必须一致,否则会抛出异常。
- 支持分区表和无分区表。
将下面代码替换掉你项目中的pipelines.py
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
from odps import ODPS
import logging
logger = logging.getLogger('OdpsPipeline')
class OdpsPipeline(object):
collection_name = 'odps'
records = []
def __init__(self, odps_endpoint, odps_project,accessid,accesskey,odps_table,odps_partition=None,buffer=1000):
self.odps_endpoint = odps_endpoint
self.odps_project = odps_project
self.accessid = accessid
self.accesskey = accesskey
self.odps_table = odps_table
self.odps_partition = odps_partition
self.buffer = buffer
@classmethod
def from_crawler(cls, crawler):
return cls(
odps_endpoint=crawler.settings.get('ODPS_ENDPOINT'),
odps_project=crawler.settings.get('ODPS_PROJECT'),
accessid=crawler.settings.get('ODPS_ACCESSID'),
accesskey=crawler.settings.get('ODPS_ACCESSKEY'),
odps_table=crawler.settings.get('ODPS_TABLE'),
odps_partition=crawler.settings.get('ODPS_PARTITION'),
buffer=crawler.settings.get('WRITE_BUFFER')
)
def open_spider(self, spider):
self.odps = ODPS(self.accessid,self.accesskey,project=self.odps_project,endpoint=self.odps_endpoint)
self.table = self.odps.get_table(self.odps_table)
if(self.odps_partition is not None and self.odps_partition != ""):
self.table.create_partition(self.odps_partition,if_not_exists=True)
def close_spider(self, spider):
self.write_to_odps()
'''
将数据写入odps
'''
def write_to_odps(self):
if(len(self.records) is None or len(self.records) == 0):
return
if(self.odps_partition is None or self.odps_partition == ""):
with self.table.open_writer() as writer:
writer.write(self.records)
logger.info("write to odps {0} records. ".format(len(self.records)))
self.records = []
else:
with self.table.open_writer(partition=self.odps_partition) as writer:
writer.write(self.records)
logger.info("write to odps {0} records. ".format(len(self.records)))
self.records = []
def isPartition(self,name):
for pt in self.table.schema.partitions:
if(pt.name == name):
return True
return False
def process_item(self, item, spider):
cols = []
for col in self.table.schema.columns:
if(self.isPartition(col.name)):
continue
c = None
for key in item.keys():
if(col.name == key):
c = item[key]
break
if(c is None):
raise Exception("{0} column not found in item.".format(col.name))
cols.append(c)
self.records.append(self.table.new_record(cols))
#logger.info("records={0} : buffer={1}".format(len(self.records),self.buffer))
if( len(self.records) >= int(self.buffer)):
self.write_to_odps()
return item
注册Pipeline 到hr_scrapy_demo/setting.py,修改ITEM_PIPELINES的值为:
# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'hr_scrapy_demo.pipelines.OdpsPipeline': 300,
}
#300代表Pipeline的优先级,可以同时存在多个pipeline,依据该数值从小到大依次执行pipeline
五、 配置ODPS 基本信息
hr_scrapy_demo/setting.py中,添加参数如下:
ODPS_PROJECT = 'your odps project name'
ODPS_ACCESSID = 'accessid'
ODPS_ACCESSKEY = 'accesskey'
ODPS_ENDPOINT = 'http://service.odps.aliyun.com/api'
#注:如果爬虫运行在ECS上,可将ODPS_ENDPOINT修改为内网地址:
#ODPS_ENDPOINT = 'http:// odps-ext.aliyun-inc.com/api'
六、创建自己的Spiders
Spider主要用于采集网站数据,并解析网站数据转换为相应的items,再交由Pipelines进行处理。针对每个需要采集的网站,我们都需要单独创建对应的Spider。
以下是一个Spider示例,以采集南方新闻网的要闻信息为依据。
# -*- coding:utf-8 -*-
import scrapy
import logging
logger = logging.getLogger('NanfangSpider')
class NanfangSpider(scrapy.Spider):
name = "nanfang"
'''
设置你要采集的其实网址,可以是多个.
此处以南方新闻网-要闻-首页为例.
'''
start_urls = [
'http://www.southcn.com/pc2016/yw/node_346416.htm'
]
'''
[ODPS配置信息]
ODPS_TABLE:ODPS表名
ODPS_PARTITION:ODPS表的分区值(可选)
WRITE_BUFFER:写入缓存(默认1000条)
'''
custom_settings = {
'ODPS_TABLE':'hr_scrapy_nanfang_news',
#'ODPS_PARTITION':'pt=20170209',
'WRITE_BUFFER':'1000'
}
'''
ODPS Demo DDL:
drop table if exists hr_scrapy_nanfang_news;
create table hr_scrapy_nanfang_news
(
title string,
source string,
times string,
url string,
editor string,
content string
);
'''
'''
对start_urls的url的解析方法,返回结果为item.
关于具体解析API可参考:https://doc.scrapy.org/en/latest/intro/tutorial.html
'''
def parse(self, response):
#查找网页中DIV元素,且其class=j-link,并对其进行遍历
for quote in response.css("div.j-link"):
#查找该DIV中的所有<a>超链接,并获取其href
href = quote.css("a::attr('href')").extract_first()
#进入该href链接,此处跳转到方法:parse_details,对其返回HTML进行再次处理。
yield scrapy.Request(response.urljoin(href),callback=self.parse_details)
#查找下一页的连接,此处用xpath方式获取,因css语法简单,无法获取
nexthref = response.xpath(u'//div[@id="displaypagenum"]//center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()
#如找到下一页,则跳转到下一页,并继续由parse对返回HTML进行处理。
if(nexthref is not None):
yield scrapy.Request(response.urljoin(nexthref),callback=self.parse)
'''
新闻详情页处理方法
'''
def parse_details(self, response):
#找到正文
main_div = response.css("div.main")
#因新闻详情也可能有分页,获取下一页的链接
next_href = main_div.xpath(u'//div[@id="displaypagenum"]/center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()
#获取正文内容,仅取DIV内所有<p>元素下的文本。
content = main_div.xpath('//div[@class="content"]//p//text()').extract()
content = "\n".join(content)
if(next_href is None):
#最后一页,则获取所有内容,返回item
title = main_div.css('div.m-article h2::text').extract_first()
source = main_div.css('div.meta span[id="pubtime_baidu"]::text').extract_first()
times = main_div.css('div.meta span[id="source_baidu"]::text').extract_first()
url = response.url
editor = main_div.css('div.m-editor::text').extract_first()
item = {}
if('item' in response.meta):
item = response.meta['item']
item['title'] = title
item['source'] = source
item['times'] = times
item['url'] = url
item['editor'] = editor
if('content' in item):
item['content'] += '\n'+content
else:
item['content'] = content
yield item
else:
#非最后一页 ,则取出当前页content,并拼接,然后跳转到下一页
request = scrapy.Request(response.urljoin(next_href),
callback=self.parse_details)
item = {}
if('item' in response.meta and 'content' in response.meta['item']):
item = response.meta['item']
item['content'] += '\n'+content
else:
item['content'] = content
request.meta['item'] = item
yield request
七、 运行Scrapy
切换到你的工程目录下,执行以下命令:
Scrapy crawl nanfang –loglevel INFO
执行结果如下图所示:
八、 验证爬取结果
待数据采集完成之后,登陆DATA IDE查看采集内容:
本文演示仅为一个简单的案例,实际生产还需考虑多线程处理,网站校验,分布式爬取等。