SPARK的计算向量化-spark本身的向量化

简介: SPARK的计算向量化-spark本身的向量化

背景


我们知道,随着计算引擎战争的结束(SPARK赢得了离线处理的霸权),越来越多的公司致力于性能的优化,而引擎的优化,目前直指计算的向量化,

这片文章来说说spark本身对于向量化的实现。


spark本身的优化


我们都知道spark的Tungsten项目,这个项目中有一点就是Code Generation(代码生成)。代码生成除了消除虚函数的调用等功能外,其实在向量化这块也是做了处理的。

直接跳到ColumnarToRowExec代码:

val columnarBatchClz = classOf[ColumnarBatch].getName
    val batch = ctx.addMutableState(columnarBatchClz, "batch")
  ...
 val localIdx = ctx.freshName("localIdx")
    val localEnd = ctx.freshName("localEnd")
    val numRows = ctx.freshName("numRows")
    val shouldStop = if (parent.needStopCheck) {
      s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
    } else {
      "// shouldStop check is eliminated"
    }
    s"""
       |if ($batch == null) {
       |  $nextBatchFuncName();
       |}
       |while ($limitNotReachedCond $batch != null) {
       |  int $numRows = $batch.numRows();
       |  int $localEnd = $numRows - $idx;
       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
       |    int $rowidx = $idx + $localIdx;
       |    ${consume(ctx, columnsBatchInput).trim}
       |    $shouldStop
       |  }
       |  $idx = $numRows;
       |  $batch = null;
       |  $nextBatchFuncName();
       |}
     """.stripMargin

spark中向量化的核心就在于这块代码中,这块代码主要的就是ColumnarBatch,也就是列批,这种列批的数据结构,用FOR循环这种方式进行数据的访问,

这在JIT中会进行优化(优化成向量化)。

而这里还有一个重点就是:Parquet或者ORC这种列式存储,读取出来的时候,天然就是一个列批的数据结构,很方便做向量化操作。


但是,利用JIT进行向量化是有缺点的:

利用了JIT进行优化,这个是需要编译器追踪循环的次数的,如果循环次数不够,就不会进行进行JIT,也就无法做到向量化。

所以好多公司把这种着力于用其他语句实现来进行真正意义上的向量化。


参考


本文参考了


深度解读|Spark 中 CodeGen 与向量化技术的研究

Velox: 现代化的向量化执行引擎


相关文章
|
JavaScript CDN
js:spark-md5分片计算文件的md5值
js:spark-md5分片计算文件的md5值
1099 0
|
7天前
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
4天前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
12 1
|
2月前
|
分布式计算 Serverless 数据处理
Serverless Spark计算服务
Serverless Spark计算服务
|
3月前
|
SQL 分布式计算 关系型数据库
Spark 分析计算连续三周登录的用户数
本文介绍了如何使用窗口函数`range between`来查询`login_time`为2022-03-10的用户最近连续三周的登录数。首先在MySQL中创建`log_data`表并插入数据,接着定义需求为找出该日期前连续三周活跃的用户数。通过Spark SQL,分步骤实现:1)确定统计周期,2)筛选符合条件的数据,3)计算用户连续登录状态。在初始实现中出现错误,因未考虑日期在周中的位置,修正后正确计算出活跃用户数。
|
3月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
|
3月前
|
SQL 分布式计算 Spark
【指标计算】Spark 计算指定用户与其他用户购买的相同商品
该代码示例使用Spark SQL解决查找指定用户(user01)与其他用户共同购买商品的问题。首先,创建SparkSession和模拟购买数据,然后通过SQL查询获取user01购买的商品集合。接着,对比所有用户购买记录,筛选出购买过相同商品且非user01的用户。输出显示了这些匹配用户的商品ID。关键在于使用`array_contains`函数检查商品是否在指定用户的购买列表中。遇到类似需求时,可参考Spark SQL官方函数文档。欢迎讨论复杂指标计算问题。
|
3月前
|
SQL 分布式计算 Hadoop
Spark分布式内存计算框架
Spark分布式内存计算框架
99 0
|
3月前
|
分布式计算 算法 数据挖掘
Spark中的图计算库GraphX是什么?请解释其作用和常用操作。
Spark中的图计算库GraphX是什么?请解释其作用和常用操作。
58 1
|
3月前
|
分布式计算 数据处理 Apache
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算