如何在MaxCompute上处理存储在OSS上的开源格式数据

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介:

0. 前言

MaxCompute作为使用最广泛的大数据平台,内部存储的数据以EB量级计算。巨大的数据存储量以及大规模计算下高性能数据读写的需求,对于MaxCompute提出了各种高要求及挑战。处在大数据时代,数据的来源多种多样,开源社区经过十几年的发展,百花齐放,各种各样的数据格式不断的出现。 我们的用户也在各个场景上,通过各种计算框架,积累了各种不同格式的数据。怎样将MaxCompute强大的计算能力开放给这些使用开源格式存储沉淀下来的数据,在MaxCompute上挖掘这些数据中的信息,是MaxCompute团队希望解决的问题。

MaxCompute 2.0最近推出的非结构化计算框架【公测阶段】,旨在从存储介质存储格式两个维度,打通计算与存储的通道。 在之前的文章中,我们已经介绍过怎样在MaxCompute上对存储在OSS上的文本,音频图像等格式的数据,以及TableStore(OTS)的KV数据进行计算处理。在这里,则将介绍对于各种流行的开源数据格式(ORC, PARQUET, SEQUENCEFILE, RCFILE, AVRO, TEXTFILE等等),怎样将其存储在OSS上面,并通过非结构化框架在MaxCompute进行处理。

本着不重造轮子的原则,对于绝大部分这些开源数据格式的解析工作,在非结构化框架中会直接调用开源社区的实现,并且无缝的与MaxCompute系统做对接。

1. 创建EXTERNAL TABLE来绑定OSS外部数据

MaxCompute非结构化数据框架通过EXTERNAL TABLE的概念来提供MaxCompute与各种数据的联通,与读取OSS数据的使用方法类似,对OSS数据进行写操作,首先要通过CREATE EXTERNAL TABLE语句创建出一个外部表,而在读取开源数据格式时,创建外表的DDL语句格式如下:

DROP TABLE [IF EXISTS] <external_table>;

CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>']
STORED AS <file format>
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
AI 代码解读

可以看到,这个语法与HIVE的语法是相当接近的,而在这个CREATE EXTERNAL TABLE的ddl语句中,有如下几点要说明:

  1. 首先要特别说明的是这里使用的是STORED AS的关键字,而不是普通非结构化外表用的STORED BY关键字,这也是目前在读取开源兼容数据时独有的。
  2. 外部表的<column schemas> 必须与具体OSS上存储存储数据的schema相符合。
  3. ROW FORMAT SERDE 并非必选选项,只有在使用一些特殊的格式上,比如TEXTFILE时才需要使用。
  4. STORED AS后面接的是文件格式名字, 比如 ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE 等等。
  5. 最后还要提到的是,在上面这个例子中,我们在LOCATION上使用了OSS明文AK,这只适用于在用户对于AK的保密性不敏感情况下使用。 对于数据安全比较敏感的场景,比如在多用户场景或者弹外集群上,则推荐使用通过STS/RAM体系事先进行鉴权,从而避免使用明文AK。

1.1 范例1: 关联OSS上存储的PARQUET数据

现在再来看一个具体的例子,假设我们有一些PARQUET文件存放在一个OSS路径上,每个文件都是PARQUET格式,存放着schema为16列(4列BINGINT, 4列DOUBLE, 8列STRING)的数据,那么可以通过如下DDL语句来描述:

CREATE EXTERNAL TABLE tpch_lineitem_parquet
(
  l_orderkey bigint,
  l_partkey bigint,
  l_suppkey bigint,
  l_linenumber bigint,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
AI 代码解读

1.2 范例2:分区表关联OSS上存储的TEXTFILE数据

同样的数据,如果是每行以JSON格式,存储成OSS上TEXTFILE文件;同时,数据在OSS通过多个目录组织,这时是可以使用MaxCompute分区表和数据关联,则可以通过如下DDL语句来描述:

CREATE EXTERNAL TABLE tpch_lineitem_textfile
(
  l_orderkey bigint,
  l_partkey bigint,
  l_suppkey bigint,
  l_linenumber bigint,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string
)
PARTITIONED BY (ds string)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';
AI 代码解读

如果OSS表目录下面的子目录是以Partition Name方式组织,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/'
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/'
...
AI 代码解读

则可以使用以下DDL语句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102");
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
AI 代码解读

如果OSS分区目录不是按这种方式组织,或者根本不在表目录下,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/;
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/;
...
AI 代码解读

则可以使用以下DDL语句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/';
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/';
...
AI 代码解读

2. 读取以及处理 OSS 上面的开源格式数据

对比上面的两个范例,可以看出对于不同文件类型,只要简单修改STORED AS后的格式名。在接下来的例子中,我们将只集中描述对上面PARQUET数据对应的外表(tpch_lineitem_parquet)的处理,如果要处理不同的文件类型,只要在DDL创建外表时指定是PARQUET/ORC/TEXTFILE/RCFILE/TEXTFILE即可,处理数据的语句则是一样的。

2.1 直接读取以及处理OSS上面的开源数据

在创建数据外表后,直接对外表就可以进行与普通MaxCompute表的操作,直接对存储在OSS上的数据进行处理,比如:

SELECT l_returnflag,
    l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
FROM tpch_lineitem_parquet
WHERE l_shipdate <= '1998-09-02'
GROUP BY
    l_returnflag,
    l_linestatus;
AI 代码解读

可以看到,在这里tpch_lineitem_parquet这个外表被当作一个普通的内部表一样使用。唯一不同的只是在MaxCompute内部计算引擎将从OSS上去读取对应的PARQUET数据来进行处理。

但是我们应该强调的是,在这里直接使用外表,每次读取的时候都需要涉及外部OSS的IO操作,并且MaxCompute系统本身针对内部存储做的许多高性能优化都用不上了,所以性能上会有所损失。 所以如果是需要对数据进行反复计算以及对计算的高效性比较敏感的场景上,我们推荐下面这种用法:先将数据导入MaxCompute内部,再进行计算。

注意,上面例子中的tpch_lineitem_textfile表,因为使用了ROW FORMAT + STORED AS,需要手动设置flag(只使用STORED AS,odps.sql.hive.compatible默认为TRUE),再进行读取,否则会有报错。

SELECT * FROM tpch_lineitem_textfile LIMIT 1;
FAILED: ODPS-0123131:User defined function exception - Traceback:
com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper

--需要手动设置hive兼容flag
set odps.sql.hive.compatible=true;
SELECT * FROM tpch_lineitem_textfile LIMIT 1;
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| l_orderkey | l_partkey  | l_suppkey  | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax      | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| 5640000001 | 174458698  | 9458733    | 1            | 14.0       | 23071.58        | 0.08       | 0.06       | N            | O            | 1998-01-26 | 1997-11-16   | 1998-02-18    | TAKE BACK RETURN | SHIP       | cuses nag silently. quick |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
AI 代码解读

2.2 将OSS上的开源数据导入MaxCompute,再进行计算

  • 首先创建一个与外部表schema一样的内部表tpch_lineitem_internal,然后将OSS上的开源数据导入MaxCompute内部表,以cFile格式存储在MaxCompute内部:
CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet;

INSERT OVERWRITE TABLE tpch_lineitem_internal
SELECT * FROM tpch_lineitem_parquet;
AI 代码解读
  • 直接就可以对内部表进行同样的操作:
SELECT l_returnflag,
    l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
FROM tpch_lineitem_internal
WHERE l_shipdate <= '1998-09-02'
GROUP BY
    l_returnflag,
    l_linestatus;
AI 代码解读

通过这样子将数据先导入系统的情况下,对同样数据的计算就会更高效得多。

4. 结语

开源的种种数据格式往往由各种数据处理生态产生,而MaxCompute非结构化数据处理框架通过实现计算与存储的互联,希望打通阿里云核心计算平台与各种数据的通路。在这个基础上,各种各样依赖于不同数据格式的应用,将能在MaxCompute计算平台上实现,后继我们会对一些具体的这种应用,比如基因计算等,再做一些具体的case study以及介绍。我们也欢迎有对开源数据进行处理分析的更多应用,能在MaxCompute强大计算能力的基础上开花结果。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
打赏
0
0
0
0
82908
分享
相关文章
解密大数据:从零开始了解数据海洋
解密大数据:从零开始了解数据海洋
38 17
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
137 4
大数据分区简化数据维护
大数据分区简化数据维护
35 4
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
阿里云OSS的优势
【7月更文挑战第19天】阿里云OSS的优势
256 2
阿里云OSS
【7月更文挑战第19天】阿里云OSS
230 1

相关产品

  • 云原生大数据计算服务 MaxCompute