spark在生产中是否要禁止掉BHJ(BroadcastHashJoin)

简介: spark在生产中是否要禁止掉BHJ(BroadcastHashJoin)

背景


本文基于spark 3.2

driver内存 2G

问题描述


在基于复杂的sql运行中,或者说是存在多个join操作的sql中,如果说driver内存不是很大的情况下,我们经常会遇到如下报错:

Caused by: org.apache.spark.SparkException: Could not execute broadcast in 800 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
  at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec$$anon$1.run(QueryStageExec.scala:217)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

其实从字面上就可以理解:broadcast的数据超时了,这种经常是由于广播的数据量太大引起的(还有部分是由于SPARK-33933所说的问题引起的),spark中默认的广播大小不是只有10M才会进行广播么?

spark.sql.autoBroadcastJoinThreshold默认为10M

为什么还会存在广播的数据量很大呢?


问题分析


直接说重点:

在spark中 SMJ转BHJ 在两个阶段会发生:


正常的物理计划的生成阶段,也就是SparkPlanner中的JoinSelection规则中

AQE阶段,也就是AdaptiveSparkPlanExec中的reOptimize方法

其实这两个阶段调用的方法都是一样的都是调用了sparkPlanner的JoinSelection规则

我们说说第一阶段,也就是正常的物理计划的生成阶段,即JoinSelection规则


这里的重要的方法是canBroadcastBySize:

def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
  }

具体的逻辑阶段的统计信息,可以参考spark logicalPlan Statistics (逻辑计划阶段的统计信息),这里如果我们是基于文件读取的话(大部分就是基于文件的读取),如果说我们的的sql是

##其中tableA有很多字段,我们只选取a,b两个字段 
select a,b from tableA

这里的逻辑阶段的数据统计就是一个大概的计算:

private def visitUnaryNode(p: UnaryNode): Statistics = {
    // There should be some overhead in Row object, the size should not be zero when there is
    // no columns, this help to prevent divide-by-zero error.
    val childRowSize = EstimationUtils.getSizePerRow(p.child.output)
    val outputRowSize = EstimationUtils.getSizePerRow(p.output)
    // Assume there will be the same number of rows as child has.
    var sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSize
    if (sizeInBytes == 0) {
      // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
      // (product of children).
      sizeInBytes = 1
    }
    // Don't propagate rowCount and attributeStats, since they are not estimated here.
    Statistics(sizeInBytes = sizeInBytes)
}
...
def getSizePerRow(
      attributes: Seq[Attribute],
      attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = {
    // We assign a generic overhead for a Row object, the actual overhead is different for different
    // Row format.
    8 + attributes.map { attr =>
      if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) {
        attr.dataType match {
          case StringType =>
            // UTF8String: base + offset + numBytes
            attrStats(attr).avgLen.get + 8 + 4
          case _ =>
            attrStats(attr).avgLen.get
        }
      } else {
        attr.dataType.defaultSize
      }
    }.sum
  }

我们看到其实就是对于从一个表中读取一个字段的大小是基于改字段类型所占avgLen的大小除以该表中所有字段总的类型的avgLen的一个比值,或者是默认值:

如: select a from tableA, 假如tableA是300M 有a,b,c,d,e等30个字段,其中a,b,c,d,e等30字段都是string类型,且defaultSize是20。

则该sql算出来的size就是 20/(30*20)*300M=10M 这样这个sql所对应的临时表就能进行广播。

但是正如spark里说的:

Statistics collected for a column.
1. The JVM data type stored in min/max is the internal data type for the corresponding Catalyst data type. For example, the internal type of DateType is Int, and that the internal type of TimestampType is Long. 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms (sketches) might have been used, and the data collected can also be stale.
Params:
distinctCount – number of distinct values
min – minimum value
max – maximum value
nullCount – number of nulls
avgLen – average length of the values. For fixed-length types, this should be a constant.
maxLen – maximum length of the values. For fixed-length types, this should be a constant.
histogram – histogram of the values
version – version of statistics saved to or retrieved from the catalog

这些统计信息有可能是不准确的(实际上String是变长类型),所以在计算的时候,有可能broadcast的数据就相对比较大,如果存在这种join有很多的情况下,就会导致driver端很卡,甚至OOM


我们再说说第二阶段,也就是AQE阶段


其实这个阶段核心还是getFinalPhysicalPlan方法中的createQueryStages方法和reOptimize方法,

在createQueryStages方法中如果方法已经是BroadcastExchangeExec的话,就会直接包装成ShuffleQueryStageExec,

如果是ShuffleExchangeExec的话,在下一个阶段会经过reOptimize方法根据运行时的统计信息大小,来进行是否可以进行SMJ到BHJ的转换

这里在的阈值判断是通过spark.sql.adaptive.autoBroadcastJoinThreshold来判断的,默认也是10M,


所以在spark UI上有时候能看到broadcast 的datasize有50M甚至100多M,而明明broadcast的阈值是10M,却变成了BroadCastHashJoin。

如下如所示:

image.png

结论


所以在大数据量,以及在复杂的sql情况下,禁止broadcasthashjoin是明确的选择,毕竟稳是一切运行的条件,但是也是可以根据单个任务个别开启。

相关文章
|
分布式计算 大数据 Spark
【Spark Summit East 2017】Spark中的容错:从生产实践中获取的经验
本讲义出自Jose Soltren在Spark Summit East 2017上的演讲,主要介绍了Spark容错中的螺母和螺栓,他首先简述了Spark中的各种容错机制,然后讨论了YARN上的Spark、调度与资源分配,在演讲中还对于一些用户案例进行了探讨研究并给出了一些需要的工具,最后分享了未来Spark中容错未来的发展方向比如调度器和检查点的变化。
1993 0
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
106 1
Spark快速大数据分析PDF下载读书分享推荐
|
1月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
116 3
|
11天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
24 3
|
15天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
33 3
|
20天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
11天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
28 0
|
3月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
161 59
|
1月前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
61 2
|
2月前
|
分布式计算 Hadoop 大数据
Hadoop与Spark在大数据处理中的对比
【7月更文挑战第30天】Hadoop和Spark在大数据处理中各有优势,选择哪个框架取决于具体的应用场景和需求。Hadoop适合处理大规模数据的离线分析,而Spark则更适合需要快速响应和迭代计算的应用场景。在实际应用中,可以根据数据处理的需求、系统的可扩展性、成本效益等因素综合考虑,选择适合的框架进行大数据处理。