大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)

简介: 大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)

接上篇:https://developer.aliyun.com/article/1622537?spm=a2c6h.13148508.setting.20.27ab4f0eUI7v7p

分区器作用与分类

在PairRDD(key,value)中,很多操作都是基于Key的,系统会按照Key对数据进行重组,如 GroupByKey

数据重组需要规则,最常见的就是基于Hash的分区,此外还有一种复杂的基于抽样Range分区方法:

HashPartitioner

最简单、最常用,也是默认提供的分区器。

对于给定的Key,计算HashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个Key所属的分区ID。

该分区方法可以保证Key相同的数据出现在同一个分区中。

用户可以通过 partitionBy主动使用分区器,通过 partitions参数指定想要分区的数量。

默认情况下的分区情况是:

val rdd1 = sc.makeRDD(1 to 100).map((_, 1))
rdd1.getNumPartitions

执行结果如下图所示:

执行结果如下图所示,分区已经让我们手动控制成10个了:


val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))
rdd2.getNumPartitions
rdd2.glom.collect.foreach(x => println(x.toBuffer))

RangePartitioner

简单来说就是将一定范围内的数映射到某个分区内,在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。

进行代码的测试:

val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1))
rdd3.glom.collect.foreach(x => println(x.toBuffer))

执行结果如下图所示:

但是现在的问题是:在执行分区之前其实并不知道数据的分布情况,如果想知道数据的分区就需要对数据进行采样。


Spark中的RangePartitioner在对数据采样的过程中使用了 “水塘采样法”

水塘采样法是:在包含N个项目的集合S中选取K个样本,其中N为1或者很大的未知的数量,尤其适用于不能把所有N个项目都存放到主内存的情况。

在采样过程中执行了 collect() 操作,引发了 Action 操作。

自定义分区器

Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。

我们需要实现自定义分区器,按照以下的规则进行分区:


分区 0 < 100

100 <= 分区1 < 200

200 <= 分区2 < 300

300 <= 分区3 < 400

900 <= 分区9 < 1000

编写代码

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.immutable


class MyPartitioner(n: Int) extends Partitioner {

  override def numPartitions: Int = n

  override def getPartition(key: Any): Int = {
    val k = key.toString.toInt
    k / 100
  }
}

object UserDefinedPartitioner {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("UserDefinedPartitioner")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val random = scala.util.Random
    val arr: immutable.IndexedSeq[Int] = (1 to  100)
      .map(idx => random.nextInt(1000))

    val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1))
    rdd1.glom.collect.foreach(x => println(x.toBuffer))

    println("=========================================")

    val rdd2 = rdd1.partitionBy(new MyPartitioner(10))
    rdd2.glom.collect().foreach(x => println(x.toBuffer))
    
    sc.stop()
    
  }

}

打包上传

这里之前已经重复过多次,就跳过了

mvn clean package

运行测试

spark-submit --master local[*] --class icu.wzk.UserDefinedPartitioner spark-wordcount

可以看到如下的运行结果:

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
存储 SQL 分布式计算
大数据散列分区映射到分区
大数据散列分区映射到分区
222 4
|
8月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
431 0
|
11月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
579 79
|
存储 负载均衡 算法
大数据散列分区计算哈希值
大数据散列分区计算哈希值
236 4
|
大数据 数据管理 定位技术
大数据散列分区选择分区键
大数据散列分区选择分区键
184 2
|
负载均衡 大数据
大数据散列分区查询频率
大数据散列分区查询频率
173 5
|
存储 大数据 数据处理
大数据散列分区数据分布
大数据散列分区数据分布
176 2
|
存储 负载均衡 监控
大数据散列分区数据分布
大数据散列分区数据分布
210 1
|
存储 负载均衡 NoSQL
大数据散列分区
大数据散列分区
212 2
|
存储 大数据 数据管理
大数据列表分区
大数据列表分区
190 1