开发者社区> 问答> 正文

如何让你的spark sql运行的更快一些?

如何让你的spark sql运行的更快一些?

展开
收起
茶什i 2020-01-09 19:12:15 4508 0
3 条回答
写回答
取消 提交回答
  • 在spark设置并行度一般通过两种方式来设置
    1.spark.default.parrallelism
    2.textFile()传入第二个参数,指定partition数量

    使用spark sql的时候会出现什么问题?
    但是如果使用来spark sql,用spark sql的那个stage的并行度,你没办法自己指定,因为spark sql 自己会默认根据hive表对应的hdfs的block,自动设置spark sql查询所在的那个stage的并行度。
    你自己通过spark.default.parallelism参数指定的并行度,只会在没有spark sql的stage中生效。
    比如:你的第一个stage,用spark sql从hive表中查询了一些数据,然后做了一些transformation操作,接着做了一个shuffle操作(例如groupByKey);下一个stage,在stage之后,做了一些transformation操作。
    hive表,对应了一个hdfs文件,有20个block;你自己设置了 spark.default.parallelish参数为100;
    你的第一个stage的并行度,是不受你设置的参数控制的,就只有20task;第二个stage的并行度,才是你自己设置的100;
    这样会产生的问题就是:在第一个stage中,可能有非常复杂的业务逻辑或者算法,如果只有默认的20个并行度的话,每个task要处理很大的数据量,这就会导致第一个stage执行的速度特别慢。而第二个就很快。

    解决方法
    直接对spark sql查询出来的rdd使用repartition,进行重新分区。
    三种设置方式:
    - 直接设置分区数量
    dataFrame.repartition(10)

    • 根据字段进行分区,分区数量由 spark.sql.shuffle.partition 决定
      dataFrame.repartition($"name")

    • 根据字段进行分区,将获得100个分区的DataFrame,这种方式可以在join的时候极大的提高效率,但是同时得注意出现数据倾斜的问题
      dataFrame.repartition(100,$"name")

    2020-01-10 11:29:29
    赞同 展开评论 打赏
  • 通过设置spark并行度

    2020-01-09 19:28:03
    赞同 展开评论 打赏
  • 目前从事大数据开发,兼顾平台搭建以及实时数据分析,主要是用scala编写程序,涉及社区开源hadoop集群,emr集群,欢迎有相同兴趣的小伙伴来交流,共同进步。最近在参与flink-java实时开发。

    首先遵循sql规范,然后可以提高你的并行度,最后,聚合的sql肯定会遇到shuffle,这就需要你解决好shuffle的问题,下面是我这你的一些技巧,希望对你有帮助
    /**
    * @author BlueCat丶懒猫
    * @title: SparkShuffleSolutions
    * @date 2019/11/18 12:37
    * @desc:
    * 2.1 数据倾斜原理
    *    在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜
    * 2.2 数据倾斜问题发现与定位
    *    通过Spark Web UI来查看当前运行的stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
    * 知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,
    * 这部分代码中肯定会有一个shuffle类算子。通过countByKey查看各个key的分布。
    * 2.3 数据倾斜解决方案
    *     2.3.1 过滤少数导致倾斜的key
    *     2.3.2 提高shuffle操作的并行度
    *     2.3.3 局部聚合和全局聚合 => solution1
    * 2.3.4 将reduce join转为map join((小表几百M或者一两G))  => solution2
    * 2.3.5 采样倾斜key并分拆join操作(join的两表都很大,但仅一个RDD的几个key的数据量过大) => solution3
    * 2.3.6 使用随机前缀和扩容RDD进行join(RDD中有大量的key导致数据倾斜) => solution4
    * 4 spark shuffle参数调优
    * spark.shuffle.file.buffer
    * 默认值:32k
    * 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
    * 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
    * spark.reducer.maxSizeInFlight
    * 默认值:48m
    * 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
    * 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 * spark.shuffle.io.maxRetries
    * 默认值:3
    * 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
    * 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
    * spark.shuffle.io.retryWait
    * 默认值:5s
    * 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
    * 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
    * spark.shuffle.memoryFraction
    * 默认值:0.2
    * 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
    * 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
    * spark.shuffle.manager
    * 默认值:sort
    * 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。 * 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。 * spark.shuffle.sort.bypassMergeThreshold * 默认值:200 * 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。 * 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。 * spark.shuffle.consolidateFiles * 默认值:false * 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。 * 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

    */

    2020-01-09 19:27:54
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载