大数据上云那些事儿:(一)上云工具之爬虫(Scrapy)数据

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 在如今互联网环境下,网络上的各种业务数据,如新闻,社交网站,交易类数据等各种各样的数据越来越多被应用到企业的数据运营中,这些数据一般都数据量巨大,是最适合用MaxCompute来进行分析和加工的一类数据,尤其可以利用MaxCompute的机器学习能力来完成一些数据挖掘的业务场景,本文就介绍如何利用开源的Scrapy爬虫框架来爬取新闻网站的数据到MaxCompute中。

在如今互联网环境下,网络上的各种业务数据,如新闻,社交网站,交易,政府公开数据,气象数据等各种各样的数据越来越多被应用到企业的数据运营中, 以打通外部数据与内部数据的通道,使得两者激情碰撞出热烈的火花。这些数据一般都数据量巨大,是最适合用MaxCompute来进行分析和加工的一类数据,尤其可以利用MaxCompute的机器学习能力来完成一些数据挖掘的业务场景,本文就介绍如何利用开源的Scrapy爬虫框架来爬取新闻网站的数据到MaxCompute中。
f8dcff02ba0a86acec2a9aaea48ab9cbdbbf22cc

一、 Scrapy简单介绍

Scrapy是一个用 Python 写的 Crawler Framework ,简单轻巧,并且非常方便。
Scrapy 使用 Twisted 这个异步网络库来处理网络通讯,架构清晰,并且包含了各种中间件接口,可以灵活的完成各种需求。整体架构如下图所示:
scrapy

绿线是数据流向,首先从初始 URL 开始,Scheduler 会将其交给 Downloader 进行下载,下载之后会交给 Spider 进行分析,Spider 分析出来的结果有两种:一种是需要进一步抓取的链接,例如之前分析的“下一页”的链接,这些东西会被传回 Scheduler ;另一种是需要保存的数据,它们则被送到 Item Pipeline 那里,那是对数据进行后期处理(详细分析、过滤、存储等)的地方。另外,在数据流动的通道里还可以安装各种中间件,进行必要的处理。

二、Scrapy环境安装

系统环境要求:

Linux

软件环境要求:

  1. 已安装:Python 2.7 ( 下载地址:https://www.python.org/ftp/python/2.7.13/Python-2.7.13.tgz)
  2. 已安装:pip (可参考:https://pip.pypa.io/en/stable/installing/ 进行安装

Scrapy安装

执行安装命令:

pip install Scrapy

Scrapy校验

执行命令:

scrapy

执行结果:
scrapy2

ODPS Python安装

执行安装命令:

pip install pyodps

ODPS Python校验

执行命令:

python -c "from odps import ODPS"

执行结果:无报错,即为安装成功

三、 创建Scrapy项目

在你想要创建Scrapy项目的目录下,执行:

scrapy startproject hr_scrapy_demo

看一下Scrapy创建项目后的目录结构:

hr_scrapy_demo /
    scrapy.cfg                # 全局配置文件
    hr_scrapy_demo /                # 项目下的Python模块,你可以从这里引用该Python模块
        __init__.py
        items.py              # 自定义的Items
        pipelines.py          # 自定义的Pipelines
        settings.py           # 自定义的项目级配置信息
        spiders/              # 自定义的spiders
            __init__.py

四、 创建OdpsPipelines

在hr_scrapy_demo/pipelines.py中,我们可以自定义我们的数据处理pipelines,以下是我之前编写好的一个OdpsPipeline,该Pipeline可以用于将我们采集到的item保存到ODPS中,但也有几点需要说明:

  1. ODPS中的表必须已经提前创建好。
  2. Spider中采集到的item必须包含该表的所有字段,且名字必须一致,否则会抛出异常。
  3. 支持分区表和无分区表。

将下面代码替换掉你项目中的pipelines.py


# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html

from odps import ODPS
import logging

logger = logging.getLogger('OdpsPipeline')
class OdpsPipeline(object):
    collection_name = 'odps'
    records = []

    def __init__(self, odps_endpoint, odps_project,accessid,accesskey,odps_table,odps_partition=None,buffer=1000):
        self.odps_endpoint = odps_endpoint
        self.odps_project = odps_project
        self.accessid = accessid
        self.accesskey = accesskey
        self.odps_table = odps_table
        self.odps_partition = odps_partition
        self.buffer = buffer

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            odps_endpoint=crawler.settings.get('ODPS_ENDPOINT'),
            odps_project=crawler.settings.get('ODPS_PROJECT'),
            accessid=crawler.settings.get('ODPS_ACCESSID'),
            accesskey=crawler.settings.get('ODPS_ACCESSKEY'),
            odps_table=crawler.settings.get('ODPS_TABLE'),
            odps_partition=crawler.settings.get('ODPS_PARTITION'),
            buffer=crawler.settings.get('WRITE_BUFFER')
        )

    def open_spider(self, spider):
        self.odps = ODPS(self.accessid,self.accesskey,project=self.odps_project,endpoint=self.odps_endpoint)
        self.table = self.odps.get_table(self.odps_table)
        if(self.odps_partition is not None and self.odps_partition != ""):
            self.table.create_partition(self.odps_partition,if_not_exists=True)
    
    def close_spider(self, spider):
        self.write_to_odps()
    
    '''
        将数据写入odps
    '''
    def write_to_odps(self):
        if(len(self.records) is None or len(self.records) == 0):
            return
        if(self.odps_partition is None or self.odps_partition == ""):
            with self.table.open_writer() as writer:
                writer.write(self.records)
                logger.info("write to odps {0} records. ".format(len(self.records)))
                self.records = []
        else:
            with self.table.open_writer(partition=self.odps_partition) as writer:
                writer.write(self.records)
                logger.info("write to odps {0} records. ".format(len(self.records)))
                self.records = []
    
    def isPartition(self,name):
        for pt in self.table.schema.partitions:
            if(pt.name == name):
                return True
        return False
        
    def process_item(self, item, spider):
        cols = []
        for col in self.table.schema.columns:
            if(self.isPartition(col.name)):
                continue
            c = None
            for key in item.keys():
                if(col.name == key):
                    c = item[key]
                    break
            if(c is None):
                raise Exception("{0} column not found in item.".format(col.name))
            cols.append(c)
        self.records.append(self.table.new_record(cols))
        #logger.info("records={0} : buffer={1}".format(len(self.records),self.buffer))
        if( len(self.records) >= int(self.buffer)):
            self.write_to_odps()
        return item

注册Pipeline 到hr_scrapy_demo/setting.py,修改ITEM_PIPELINES的值为:

# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
    'hr_scrapy_demo.pipelines.OdpsPipeline': 300,
}
#300代表Pipeline的优先级,可以同时存在多个pipeline,依据该数值从小到大依次执行pipeline

五、 配置ODPS 基本信息

hr_scrapy_demo/setting.py中,添加参数如下:

ODPS_PROJECT = 'your odps project name'
ODPS_ACCESSID = 'accessid'
ODPS_ACCESSKEY = 'accesskey'
ODPS_ENDPOINT = 'http://service.odps.aliyun.com/api'
#注:如果爬虫运行在ECS上,可将ODPS_ENDPOINT修改为内网地址:
#ODPS_ENDPOINT = 'http:// odps-ext.aliyun-inc.com/api'

六、创建自己的Spiders

Spider主要用于采集网站数据,并解析网站数据转换为相应的items,再交由Pipelines进行处理。针对每个需要采集的网站,我们都需要单独创建对应的Spider。
以下是一个Spider示例,以采集南方新闻网的要闻信息为依据。


# -*- coding:utf-8 -*-  
import scrapy
import logging

logger = logging.getLogger('NanfangSpider')

class NanfangSpider(scrapy.Spider):
    name = "nanfang"
    
    '''
        设置你要采集的其实网址,可以是多个.
        此处以南方新闻网-要闻-首页为例.
    '''
    start_urls = [
            'http://www.southcn.com/pc2016/yw/node_346416.htm'
            ]
    
    '''
        [ODPS配置信息]
        ODPS_TABLE:ODPS表名
        ODPS_PARTITION:ODPS表的分区值(可选)
        WRITE_BUFFER:写入缓存(默认1000条)
    '''
    custom_settings = {
        'ODPS_TABLE':'hr_scrapy_nanfang_news',
        #'ODPS_PARTITION':'pt=20170209',
        'WRITE_BUFFER':'1000'
    }
    
    '''
        ODPS Demo DDL:
        drop table if exists hr_scrapy_nanfang_news;
        create table hr_scrapy_nanfang_news
        (
            title string,
            source string,
            times string,
            url string,
            editor string,
            content string
        );
    '''
    
    '''
        对start_urls的url的解析方法,返回结果为item.
        关于具体解析API可参考:https://doc.scrapy.org/en/latest/intro/tutorial.html
    '''
    def parse(self, response):
    
        #查找网页中DIV元素,且其class=j-link,并对其进行遍历
        for quote in response.css("div.j-link"):
            #查找该DIV中的所有<a>超链接,并获取其href
            href = quote.css("a::attr('href')").extract_first()
            
            #进入该href链接,此处跳转到方法:parse_details,对其返回HTML进行再次处理。
            yield scrapy.Request(response.urljoin(href),callback=self.parse_details)
        
        #查找下一页的连接,此处用xpath方式获取,因css语法简单,无法获取
        nexthref = response.xpath(u'//div[@id="displaypagenum"]//center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()
        
        #如找到下一页,则跳转到下一页,并继续由parse对返回HTML进行处理。
        if(nexthref is not None):
            yield scrapy.Request(response.urljoin(nexthref),callback=self.parse)
        
    '''
        新闻详情页处理方法
    '''
    def parse_details(self, response):
        #找到正文
        main_div = response.css("div.main")
        
        #因新闻详情也可能有分页,获取下一页的链接
        next_href = main_div.xpath(u'//div[@id="displaypagenum"]/center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()
        
        #获取正文内容,仅取DIV内所有<p>元素下的文本。
        content = main_div.xpath('//div[@class="content"]//p//text()').extract()
        content = "\n".join(content)
        
        if(next_href is None):
            #最后一页,则获取所有内容,返回item
            title = main_div.css('div.m-article h2::text').extract_first()
            source = main_div.css('div.meta span[id="pubtime_baidu"]::text').extract_first()
            times = main_div.css('div.meta span[id="source_baidu"]::text').extract_first()
            url = response.url
            editor = main_div.css('div.m-editor::text').extract_first()
            item = {}
            if('item' in response.meta):
                item = response.meta['item']
            item['title'] = title
            item['source'] = source
            item['times'] = times
            item['url'] = url
            item['editor'] = editor
            if('content' in item):
                item['content'] += '\n'+content
            else:
                item['content'] = content
            yield item
            
        else:
            #非最后一页 ,则取出当前页content,并拼接,然后跳转到下一页
            request = scrapy.Request(response.urljoin(next_href),
                                 callback=self.parse_details)
            item = {}
            if('item' in response.meta and 'content' in response.meta['item']):
                item = response.meta['item']
                item['content'] += '\n'+content
            else:
                item['content'] = content
            request.meta['item'] = item
            yield request

七、 运行Scrapy

切换到你的工程目录下,执行以下命令:

Scrapy crawl nanfang –loglevel INFO
执行结果如下图所示:

image

八、 验证爬取结果

待数据采集完成之后,登陆DATA IDE查看采集内容:
image

本文演示仅为一个简单的案例,实际生产还需考虑多线程处理,网站校验,分布式爬取等。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
7天前
|
分布式计算 DataWorks NoSQL
DataWorks产品使用合集之同步Holo数据到ODPS的过程中,出现部分数据的值变为星号(),是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之同步Holo数据到ODPS的过程中,出现部分数据的值变为星号(),是什么原因
|
2天前
|
数据采集 大数据 Python
FFmpeg 在爬虫中的应用案例:流数据解码详解
在大数据背景下,网络爬虫与FFmpeg结合,高效采集小红书短视频。需准备FFmpeg、Python及库如Requests和BeautifulSoup。通过设置User-Agent、Cookie及代理IP增强隐蔽性,解析HTML提取视频链接,利用FFmpeg下载并解码视频流。示例代码展示完整流程,强调代理IP对避免封禁的关键作用,助你掌握视频数据采集技巧。
FFmpeg 在爬虫中的应用案例:流数据解码详解
|
17天前
|
数据采集 自然语言处理 大数据
​「Python大数据」词频数据渲染词云图导出HTML
使用Python,本文展示数据聚类和办公自动化,焦点在于通过jieba分词处理VOC数据,构建词云图并以HTML保存。`wordCloud.py`脚本中,借助pyecharts生成词云,如图所示,关键词如&quot;Python&quot;、&quot;词云&quot;等。示例代码创建了词云图实例,添加词频数据,并输出到&quot;wordCloud.html&quot;。
37 1
​「Python大数据」词频数据渲染词云图导出HTML
|
2天前
|
数据采集 Web App开发 存储
Python-数据爬取(爬虫)
【7月更文挑战第24天】
27 7
|
2天前
|
数据采集 机器学习/深度学习 算法
Python-数据爬取(爬虫)
【7月更文挑战第23天】
16 5
|
6天前
|
SQL 机器学习/深度学习 分布式计算
MaxCompute产品使用合集之数据删除之后,是否支持回滚
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之同样的表和数据,在PolarDB执行LEFT JOIN查询可以得到结果,但在MaxCompute中却返回为空,是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
7天前
|
分布式计算 DataWorks API
DataWorks产品使用合集之使用REST API Reader往ODPS写数据时,如何获取入库时间
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之整库离线同步至MC的配置中,是否可以清除原表所有分区数据的功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用合集之如何将数据映射成Holo表的语句
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute