背景
我们知道,随着计算引擎战争的结束(SPARK赢得了离线处理的霸权),越来越多的公司致力于性能的优化,而引擎的优化,目前直指计算的向量化,
这片文章来说说spark本身对于向量化的实现。
spark本身的优化
我们都知道spark的Tungsten项目,这个项目中有一点就是Code Generation(代码生成)。代码生成除了消除虚函数的调用等功能外,其实在向量化这块也是做了处理的。
直接跳到ColumnarToRowExec代码:
val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.addMutableState(columnarBatchClz, "batch") ... val localIdx = ctx.freshName("localIdx") val localEnd = ctx.freshName("localEnd") val numRows = ctx.freshName("numRows") val shouldStop = if (parent.needStopCheck) { s"if (shouldStop()) { $idx = $rowidx + 1; return; }" } else { "// shouldStop check is eliminated" } s""" |if ($batch == null) { | $nextBatchFuncName(); |} |while ($limitNotReachedCond $batch != null) { | int $numRows = $batch.numRows(); | int $localEnd = $numRows - $idx; | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { | int $rowidx = $idx + $localIdx; | ${consume(ctx, columnsBatchInput).trim} | $shouldStop | } | $idx = $numRows; | $batch = null; | $nextBatchFuncName(); |} """.stripMargin
spark中向量化的核心就在于这块代码中,这块代码主要的就是ColumnarBatch,也就是列批,这种列批的数据结构,用FOR循环这种方式进行数据的访问,
这在JIT中会进行优化(优化成向量化)。
而这里还有一个重点就是:Parquet或者ORC这种列式存储,读取出来的时候,天然就是一个列批的数据结构,很方便做向量化操作。
但是,利用JIT进行向量化是有缺点的:
利用了JIT进行优化,这个是需要编译器追踪循环的次数的,如果循环次数不够,就不会进行进行JIT,也就无法做到向量化。
所以好多公司把这种着力于用其他语句实现来进行真正意义上的向量化。
参考
本文参考了
深度解读|Spark 中 CodeGen 与向量化技术的研究
Velox: 现代化的向量化执行引擎