OSS数据湖实践——parquet格式

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 通过对parquet格式及json格式的对比,了解两种常用格式之间存在的异同,了解parquet 能够提高作业性能的内在机制,并且阐述其能够带来的优势。

数据组织形式、存储格式及Parquet格式介绍

在介绍parquet数据格式之前,我们先介绍数据的几种组织形式以及存储形式。
1654657_7465423457acb89f

结构化、半结构化、非结构化数据

结构化数据

结构化数据源对数据定义了一种模式。通过这些关于底层数据的额外信息,结构化数据源提供高效的存储和性能。例如,列式数据存储Parquet和ORC,使得从一个列子集中提取数据更加容易。当数据查询只需要获取一少部分列的数据时,通过遍历每行数据的方式需要查询出过多的数据。基于行的存储格式,如Avro通过高效的序列化存储数据提供了存储优势。但是,这种优势是以复杂性为代价的。例如,由于结构不够灵活,模式转换将成为挑战。

半结构化数据

半结构化数据源是每行记录一个结构,但不需要对整体记录有一个全局的模式定义。因此,每行记录是通过其自身的模式信息对其进行扩充。JSON和XML就是其中最流行的例子。半结构化数据的优势在于通过每行记录自身的描述信息,增强了展示数据信息的灵活性。由于有很多轻量级的解析器用于处理这些记录,因此半结构化数据格式在很多应用中普遍被使用,并且在可读性上存在优势。但是,它的主要缺陷也在于会产生额外的解析开销,不能专门应用于即席查询。

非结构化数据

相比之下,非结构化数据源是任意格式的文本或不包含标记或元数据的二进制对象(例如以逗号分隔的CSV文件)来组织数据。新闻文章,医疗记录,图像斑点,应用日志经常被当成是非结构化数据。这些数据源分类一般需要根据数据的上下文才能解析。因此,需要清楚知道某个文件是图片还是新闻,才能正确进行解析。大多数数据源都是非结构化的,要从这些非结构化的数据中获取数据价值,由于其格式本身的笨重,需要经过大量转换和特征提取技术去解释这些数据集,成本较高。

列式存储、行式存储

列式存储

列式存储(Column-oriented Storage)并不是一项新技术,最早可以追溯到 1983 年的论文 Cantor。然而,受限于早期的硬件条件和使用场景,主流的事务型数据库(OLTP)大多采用行式存储,直到近几年分析型数据库(OLAP)的兴起,列式存储这一概念又变得流行。
总的来说,列式存储的优势一方面体现在存储上能节约空间、减少 IO,另一方面依靠列式数据结构做了计算上的优化。

行式存储

行式存储通过逐行组织数据,所有的数据在存储介质上通过首位相连、逐条存储,行式存储是一种传统的组织数据的方法。

parquet格式介绍

Apache Parquet 是Hadoop生态系统中通用的列式存储格式,独立于数据处理框架、数据模型、编程语言;
Parquet的灵感来自于2010年Google发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能。

Parquet跟Json对比

对比特征 parquet json
存储形式 列式 行式
数据组织形式 结构化 半结构化
文件大小
读取速度

Json转Parquet代码

如果有大批量的Json格式数据需要转为Parquet格式数据,参考以下代码;

import os
import multiprocessing
from json2parquet import convert_json
def split_file(file_name, path):
    result_path = "parquet/"
    file_path = path + file_name
    res_path = result_path + file_name + ".parquet"
    convert_json(file_path, res_path)
def main():
    path = "data/"
    file_list = os.listdir(path)
    pool = multiprocessing.Pool(processes=20)
    for file_name in file_list:
        pool.apply_async(split_file, (file_name, path,))
    pool.close()
    pool.join()
    
if __name__ == '__main__':
    main()

Parquet格式运行任务

使用parquet数据格式,来运行作业,使用spark read api中的parquet接口;
其中包括可以读指定的单个文件,或者一组文件;

spark.read.parquet("your parquet file or files")

读取单个parquet 文件方法

/**
   * Loads a Parquet file, returning the result as a `DataFrame`. See the documentation
   * on the other overloaded `parquet()` method for more details.
   *
   * @since 2.0.0
   */
  def parquet(path: String): DataFrame = {
    // This method ensures that calls that explicit need single argument works, see SPARK-16009
    parquet(Seq(path): _*)
  }

读取一组paruqet 文件方法

/**
   * Loads a Parquet file, returning the result as a `DataFrame`.
   *
   * You can set the following Parquet-specific option(s) for reading Parquet files:
   * <ul>
   * <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
   * whether we should merge schemas collected from all Parquet part-files. This will override
   * `spark.sql.parquet.mergeSchema`.</li>
   * </ul>
   * @since 1.4.0
   */
  @scala.annotation.varargs
  def parquet(paths: String*): DataFrame = {
    format("parquet").load(paths: _*)
  }

简单作业使用parquet数据源示例

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
object OSSExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("OSSExample")
      .getOrCreate()
    val data=spark.read.parquet.load("oss://your-bucket-name/parquet file")
    val data1 = data.groupBy("subject", "level").count()
    val window = Window.partitionBy("subject").orderBy(org.apache.spark.sql.functions.col("count").desc)
    val data2 = data1.withColumn("topn", row_number().over(window)).where("topn <= 1" )
    data2.write.format("parquet").save("your store path")
  }
}

作业性能对比

数据格式 计算用时 读OSS流量 读OSS次数 写OSS流量
json 15min 1384G 1387458 12M
parquet 9min 104G 286029 12M

Parquet的优势

1、可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量,提升作业运行性能;
2、压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如 Run Length Encoding 和 Delta Encoding)进一步节约存储空间;这样能够更少的使用OSS存储空间,减少数据存储成本;
3、只读取需要的列,支持向量运算,能够获取更好的扫描性能。

参考资料

Sql Or NoSql,看完这一篇你就懂了
一分钟搞懂列式与行式数据库
列式存储和行式存储的区别
spark sql加载parquet格式和json格式数据
Spark2.1中用结构化流处理复杂的数据格式(译)
处理海量数据:列式存储综述(存储篇)
Row vs Column Oriented Databases

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
23天前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
2月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之在读取OSS遇到格式报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
存储 运维 监控
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
通过对各个业务线实时需求的调研了解到,当前实时数据处理场景是各个业务线基于Java服务独自处理的。各个业务线实时能力不能复用且存在计算资源的扩展性问题,而且实时处理的时效已不能满足业务需求。鉴于当前大数据团队数据架构主要解决离线场景,无法承接更多实时业务,因此我们需要重新设计整合,从架构合理性,复用性以及开发运维成本出发,建设一套通用的大数据实时数仓链路。本次实时数仓建设将以游戏运营业务为典型场景进行方案设计,综合业务时效性、资源成本和数仓开发运维成本等考虑,我们最终决定基于Flink + Hudi + Hologres来构建阿里云云原生实时湖仓,并在此文中探讨实时数据架构的具体落地实践。
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
|
3月前
|
存储 分布式计算 大数据
MaxCompute产品使用问题之创建了oss外表,格式指定的parquet,然后执行的写入,发现不是标准parquet的格式,该怎么办
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
存储 数据管理 Java
基于OSS、NFS构建高性能可扩展的遥感智能解译系统实践之路
该文探讨了构建高性能、可扩展的遥感智能解译系统的架构演进过程。作者强调架构应根据业务场景而定,而非追求单一的“最佳”架构。文章分为五个部分,介绍了从初步的业务场景分析到逐步优化的架构设计。 1. 业务场景描述了服务于地理信息行业的遥感数据管理平台,包括数据湖和遥感智能解译系统的功能和架构设计。 2. 初代系统解决了数据管理和智能解译的基本需求,但存在数据同步效率低下的问题。 3. 自动化阶段通过消息推送和数据接收模块减少了人工干预,将处理时间减半,但仍存在效率和稳定性问题。 4. 高性能阶段引入数据订阅/推送和数据接收Agent,实现了稳定、高速的数据传输,性能提升了6倍。
48794 2
|
4月前
|
存储 人工智能 运维
数据湖建设实践:使用AWS S3与LakeFormation构建灵活数据存储
【4月更文挑战第8天】本文分享了使用AWS S3和LakeFormation构建数据湖的经验。选择S3作为数据湖存储,因其无限容量、高可用性和持久性,以及与多种系统的兼容性。LakeFormation则负责数据治理和权限管理,包括元数据管理、简化数据接入、细粒度权限控制和审计。通过这种方式,团队实现了敏捷开发、成本效益和数据安全。未来,数据湖将融合更多智能化元素,如AI和ML,以提升效能和体验。此实践为数据驱动决策和企业数字化转型提供了有力支持。
242 2
|
4月前
|
SQL 关系型数据库 HIVE
KLOOK客路旅行基于Apache Hudi的数据湖实践
KLOOK客路旅行基于Apache Hudi的数据湖实践
87 2
KLOOK客路旅行基于Apache Hudi的数据湖实践
|
4月前
|
存储 分布式计算 分布式数据库
字节跳动基于Apache Hudi构建EB级数据湖实践
字节跳动基于Apache Hudi构建EB级数据湖实践
67 2
|
4月前
|
消息中间件 监控 Kafka
Yotpo构建零延迟数据湖实践
Yotpo构建零延迟数据湖实践
115 0
|
4月前
|
消息中间件 存储 数据采集
在线房产公司Zillow数据迁移至数据湖实践
在线房产公司Zillow数据迁移至数据湖实践
84 0
下一篇
DDNS