VectorizedReader 和 ORC

简介: spark SQL not only SQL1.SparkSession/DataFrame/Datasets API2.

spark SQL not only SQL

1.SparkSession/DataFrame/Datasets API
2.Catalyst Optimization & Tungsten Execution
3.DataSource Connectors/  Spark Core(RDD API)

优化尽可能的发生晚些,因为spark SQL,可以通过函数和库优化
整体的优化使用库和sql/dataframe

RUN EXPLAIN plan
Interpret  plan
tune plan

https://dbricks.co/2rR8vAr

optimizer:
使用启发式和代价重写查询计划

        column pruning:列裁剪, outer join elimination:消除outer join
        Predicate push down:谓词下推, constraint propagation:约束传播(broadcast)
        constant floding:常量累加: join reordering: join重排序
        .....

    spark.sql.autoBroadcastJoin.threshold
    keep the statistics updated 
    broadcastJoin Hint

memory manager:
    跟踪内存的使用,有效的分配内存在task和算子
    code generator: 编译物理计划到优化后的java代码

Tungsten Engine: 高效的二进制数据格式和数据结构对cpu和内存的高效。

调整spark.sql.codegen.hugeMethodLimit去避免较大的方法(>8k), 因为这不能被JIT编译器
所编译。

spark分为计算和存储:

完整的数据流:
    外部存储给spark 喂数据
    spark处理数据
如果spark处理数据很快,数据源就可能称为瓶颈。

更高效的去读取柱状的向量化数据
更高效的使用jvm生成simd 说明


指定的文件系统可以完成跳过不必要的数据和预shuffle,可以通过不必要的shuffle和IO来加速查询

选择支持向量化读取的数据源(parquet,orc)
基于文件的数据源,尽可能的创建分区,桶。

Spark 2.3.0支持ORC Vectorized矢量化源码分析

在Spark2.3.0的release文档中,提到ORC Vectored带来的性能提升:

提高scan吞吐2-5倍;
开启条件:spark.sql.orc.impl=native;

ORC 文件类型

当然该ISSUE的提出还是有些背景的(https://issues.apache.org/jira/browse/SPARK-16060),ORC文件格式本身是Hortonworks提出的针对Hive查询的一种列式存储方案,ORC是在一定程度上扩展了RCFile,是对RCFile的优化。有别于Facebook的RCFile类型,ORC有如下优点:

  • ORCFile在RCFile基础上引申出来Stripe和Footer等。每个ORC文件首先会被横向切分成多个Stripe,而每个Stripe内部以列存储,所有的列存储在一个文件中,而且每个stripe默认的大小是250MB,相对于RCFile默认的行组大小是4MB,所以比RCFile更高效;
  • ORCFile扩展了RCFile的压缩,除了Run-length(游程编码),引入了字典编码和Bit编码;
  • ORCFile保存了文件更多的元信息;

其存储格式如下:

IndexData中保存了该stripe上数据的位置信息,总行数等信息
RowData以stream的形式保存了数据的具体信息
Stripe Footer中包含该stripe的统计结果,包括Max,Min,count等信息

IndexData
RowData
StripeFooter

...

FileFooter中包含该表的统计结果,以及各个Stripe的位置信息
Postscripts中存储该表的行数,压缩参数,压缩大小,列等信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

ORC Vectored使用场景

spark.sql.orc.impl=native的判断

其中spark.sql.orc.impl有native和hive两种选择,如果针对orc类型选择hive格式直接调用org.apache.spark.sql.hive.orc.OrcFileFormat类实现类的加载,而如果为native则会基于org.apache.spark.sql.execution.datasources.orc.OrcFileFormat类进行加载。

 /** Given a provider name, look up the data source class definition. */
  def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case name => name
    }

    ...
  }  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

什么时候支持ORC Vectored?

  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
    val conf = sparkSession.sessionState.conf
    conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
      schema.length <= conf.wholeStageMaxNumFields &&
      schema.forall(_.dataType.isInstanceOf[AtomicType])
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

需要满足以下条件:
* 开启spark.sql.orc.enableVectorizedReader: 默认true;
* 开启spark.sql.codegen.wholeStage: 默认true并且其scheme的长度不大于wholeStageMaxNumFields(默认100列);
* [关键]所有列数据类型需要为AtomicType类型的;

AtomicType类型,可根据定义查看:

/**
 * An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps.
 */
protected[sql] abstract class AtomicType extends DataType {
  private[sql] type InternalType
  private[sql] val tag: TypeTag[InternalType]
  private[sql] val ordering: Ordering[InternalType]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

AtomicType代表了非 null/UDTs/arrays/structs/maps类型。所以如果所含列中如果包含null/UDTs/arrays/structs/maps类型,依然无法收到该ISSUE的便利。

ORC Vectored实现

OrcColumnarBatchReader的使用

在OrcFileFormat.buildReaderWithPartitionValues中:

    if (enableVectorizedReader) {
          val batchReader = new OrcColumnarBatchReader(
            enableOffHeapColumnVector && taskContext.isDefined, copyToSpark)
          // SPARK-23399 Register a task completion listener first to call `close()` in all cases.
          // There is a possibility that `initialize` and `initBatch` hit some errors (like OOM)
          // after opening a file.
          val iter = new RecordReaderIterator(batchReader)
          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))

          // 调用initialize函数
          batchReader.initialize(fileSplit, taskAttemptContext)
          // 调用initBatch
          batchReader.initBatch(
            reader.getSchema,
            requestedColIds,
            requiredSchema.fields,
            partitionSchema,
            file.partitionValues)
          // 生成iter
          iter.asInstanceOf[Iterator[InternalRow]]
    } else {
          val orcRecordReader = new OrcInputFormat[OrcStruct]
            .createRecordReader(fileSplit, taskAttemptContext)
          val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))

          val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
          val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
          val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)

          if (partitionSchema.length == 0) {
            iter.map(value => unsafeProjection(deserializer.deserialize(value)))
          } else {
            val joinedRow = new JoinedRow()
            iter.map(value =>
              unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues)))
          }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

OrcColumnarBatchReader的实现

initialize(): 初始化OrcFile Reader及Hadoop环境配置;
initBatch(): 初始化batch变量和columnarBatch变量(其中batch为ORC Reader矢量化每次读取的结果存储变量,columnarBatch为codegen转换为Spark定义类型存储变量 );
nextBatch(): 迭代器,其核心还是调用ORC自定义的vectored函数,需要根据类型转换Spark定义type;

单元测试

参考: org.apache.spark.sql.hive.orc.OrcReadBenchmark

结果分析:
针对数字、String类型测试

Native ORC Vectorized > Native ORC Vectorized with copy > Native ORC MR > Hive built-in ORC

针对分区、不分区测试

Partition性能远远>不分区性能

参考:




相关文章
|
Java Maven
Maven常用镜像配置
Maven常用镜像配置
2106 0
|
Java
Pytest----Windows10系统安装配置allure
Pytest----Windows10系统安装配置allure
1645 0
Pytest----Windows10系统安装配置allure
|
Linux
阿里云官方yum源
阿里云官方yum源
75149 0
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
2112 0
|
7月前
|
人工智能 搜索推荐 机器人
07_大模型未来趋势:2025年AI技术前沿展望
2025年,人工智能技术正站在一个新的历史节点上。经过过去几年的爆发式发展,大语言模型(LLM)已从实验室走向各行各业,成为推动数字化转型的核心力量
1100 0
|
机器学习/深度学习 人工智能 开发工具
Clone-voice:开源的声音克隆工具,支持文本转语音或改变声音风格,支持16种语言
Clone-voice是一款开源的声音克隆工具,支持16种语言,能够将文本转换为语音或将一种声音风格转换为另一种。该工具基于深度学习技术,界面友好,操作简单,适用于多种应用场景,如视频制作、语言学习和广告配音等。
3214 9
Clone-voice:开源的声音克隆工具,支持文本转语音或改变声音风格,支持16种语言
|
消息中间件 存储 Apache
Apache Paimon 表模式最佳实践
Apache Paimon 表模式最佳实践
4750 57
|
开发框架 前端开发 JavaScript
基于SqlSugar的开发框架循序渐进介绍(15)-- 整合代码生成工具进行前端界面的生成
基于SqlSugar的开发框架循序渐进介绍(15)-- 整合代码生成工具进行前端界面的生成
|
SQL 分布式计算 HIVE
sparksql 参数调优
sparksql 参数调优