数据组织形式、存储格式及Parquet格式介绍
在介绍parquet数据格式之前,我们先介绍数据的几种组织形式以及存储形式。
结构化、半结构化、非结构化数据
结构化数据
结构化数据源对数据定义了一种模式。通过这些关于底层数据的额外信息,结构化数据源提供高效的存储和性能。例如,列式数据存储Parquet和ORC,使得从一个列子集中提取数据更加容易。当数据查询只需要获取一少部分列的数据时,通过遍历每行数据的方式需要查询出过多的数据。基于行的存储格式,如Avro通过高效的序列化存储数据提供了存储优势。但是,这种优势是以复杂性为代价的。例如,由于结构不够灵活,模式转换将成为挑战。
半结构化数据
半结构化数据源是每行记录一个结构,但不需要对整体记录有一个全局的模式定义。因此,每行记录是通过其自身的模式信息对其进行扩充。JSON和XML就是其中最流行的例子。半结构化数据的优势在于通过每行记录自身的描述信息,增强了展示数据信息的灵活性。由于有很多轻量级的解析器用于处理这些记录,因此半结构化数据格式在很多应用中普遍被使用,并且在可读性上存在优势。但是,它的主要缺陷也在于会产生额外的解析开销,不能专门应用于即席查询。
非结构化数据
相比之下,非结构化数据源是任意格式的文本或不包含标记或元数据的二进制对象(例如以逗号分隔的CSV文件)来组织数据。新闻文章,医疗记录,图像斑点,应用日志经常被当成是非结构化数据。这些数据源分类一般需要根据数据的上下文才能解析。因此,需要清楚知道某个文件是图片还是新闻,才能正确进行解析。大多数数据源都是非结构化的,要从这些非结构化的数据中获取数据价值,由于其格式本身的笨重,需要经过大量转换和特征提取技术去解释这些数据集,成本较高。
列式存储、行式存储
列式存储
列式存储(Column-oriented Storage)并不是一项新技术,最早可以追溯到 1983 年的论文 Cantor。然而,受限于早期的硬件条件和使用场景,主流的事务型数据库(OLTP)大多采用行式存储,直到近几年分析型数据库(OLAP)的兴起,列式存储这一概念又变得流行。
总的来说,列式存储的优势一方面体现在存储上能节约空间、减少 IO,另一方面依靠列式数据结构做了计算上的优化。
行式存储
行式存储通过逐行组织数据,所有的数据在存储介质上通过首位相连、逐条存储,行式存储是一种传统的组织数据的方法。
parquet格式介绍
Apache Parquet 是Hadoop生态系统中通用的列式存储格式,独立于数据处理框架、数据模型、编程语言;
Parquet的灵感来自于2010年Google发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能。
Parquet跟Json对比
对比特征 | parquet | json |
---|---|---|
存储形式 | 列式 | 行式 |
数据组织形式 | 结构化 | 半结构化 |
文件大小 | 小 | 大 |
读取速度 | 快 | 慢 |
Json转Parquet代码
如果有大批量的Json格式数据需要转为Parquet格式数据,参考以下代码;
import os
import multiprocessing
from json2parquet import convert_json
def split_file(file_name, path):
result_path = "parquet/"
file_path = path + file_name
res_path = result_path + file_name + ".parquet"
convert_json(file_path, res_path)
def main():
path = "data/"
file_list = os.listdir(path)
pool = multiprocessing.Pool(processes=20)
for file_name in file_list:
pool.apply_async(split_file, (file_name, path,))
pool.close()
pool.join()
if __name__ == '__main__':
main()
Parquet格式运行任务
使用parquet数据格式,来运行作业,使用spark read api中的parquet接口;
其中包括可以读指定的单个文件,或者一组文件;
spark.read.parquet("your parquet file or files")
读取单个parquet 文件方法
/**
* Loads a Parquet file, returning the result as a `DataFrame`. See the documentation
* on the other overloaded `parquet()` method for more details.
*
* @since 2.0.0
*/
def parquet(path: String): DataFrame = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
parquet(Seq(path): _*)
}
读取一组paruqet 文件方法
/**
* Loads a Parquet file, returning the result as a `DataFrame`.
*
* You can set the following Parquet-specific option(s) for reading Parquet files:
* <ul>
* <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
* whether we should merge schemas collected from all Parquet part-files. This will override
* `spark.sql.parquet.mergeSchema`.</li>
* </ul>
* @since 1.4.0
*/
@scala.annotation.varargs
def parquet(paths: String*): DataFrame = {
format("parquet").load(paths: _*)
}
简单作业使用parquet数据源示例
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
object OSSExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("OSSExample")
.getOrCreate()
val data=spark.read.parquet.load("oss://your-bucket-name/parquet file")
val data1 = data.groupBy("subject", "level").count()
val window = Window.partitionBy("subject").orderBy(org.apache.spark.sql.functions.col("count").desc)
val data2 = data1.withColumn("topn", row_number().over(window)).where("topn <= 1" )
data2.write.format("parquet").save("your store path")
}
}
作业性能对比
数据格式 | 计算用时 | 读OSS流量 | 读OSS次数 | 写OSS流量 |
---|---|---|---|---|
json | 15min | 1384G | 1387458 | 12M |
parquet | 9min | 104G | 286029 | 12M |
Parquet的优势
1、可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量,提升作业运行性能;
2、压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如 Run Length Encoding 和 Delta Encoding)进一步节约存储空间;这样能够更少的使用OSS存储空间,减少数据存储成本;
3、只读取需要的列,支持向量运算,能够获取更好的扫描性能。
参考资料
Sql Or NoSql,看完这一篇你就懂了
一分钟搞懂列式与行式数据库
列式存储和行式存储的区别
spark sql加载parquet格式和json格式数据
Spark2.1中用结构化流处理复杂的数据格式(译)
处理海量数据:列式存储综述(存储篇)
Row vs Column Oriented Databases