大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)

接上篇:https://developer.aliyun.com/article/1622537?spm=a2c6h.13148508.setting.20.27ab4f0eUI7v7p

分区器作用与分类

在PairRDD(key,value)中,很多操作都是基于Key的,系统会按照Key对数据进行重组,如 GroupByKey

数据重组需要规则,最常见的就是基于Hash的分区,此外还有一种复杂的基于抽样Range分区方法:

HashPartitioner

最简单、最常用,也是默认提供的分区器。

对于给定的Key,计算HashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个Key所属的分区ID。

该分区方法可以保证Key相同的数据出现在同一个分区中。

用户可以通过 partitionBy主动使用分区器,通过 partitions参数指定想要分区的数量。

默认情况下的分区情况是:

val rdd1 = sc.makeRDD(1 to 100).map((_, 1))
rdd1.getNumPartitions

执行结果如下图所示:

执行结果如下图所示,分区已经让我们手动控制成10个了:


val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))
rdd2.getNumPartitions
rdd2.glom.collect.foreach(x => println(x.toBuffer))

RangePartitioner

简单来说就是将一定范围内的数映射到某个分区内,在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。

进行代码的测试:

val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1))
rdd3.glom.collect.foreach(x => println(x.toBuffer))

执行结果如下图所示:

但是现在的问题是:在执行分区之前其实并不知道数据的分布情况,如果想知道数据的分区就需要对数据进行采样。


Spark中的RangePartitioner在对数据采样的过程中使用了 “水塘采样法”

水塘采样法是:在包含N个项目的集合S中选取K个样本,其中N为1或者很大的未知的数量,尤其适用于不能把所有N个项目都存放到主内存的情况。

在采样过程中执行了 collect() 操作,引发了 Action 操作。

自定义分区器

Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。

我们需要实现自定义分区器,按照以下的规则进行分区:


分区 0 < 100

100 <= 分区1 < 200

200 <= 分区2 < 300

300 <= 分区3 < 400

900 <= 分区9 < 1000

编写代码

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.immutable


class MyPartitioner(n: Int) extends Partitioner {

  override def numPartitions: Int = n

  override def getPartition(key: Any): Int = {
    val k = key.toString.toInt
    k / 100
  }
}

object UserDefinedPartitioner {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("UserDefinedPartitioner")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val random = scala.util.Random
    val arr: immutable.IndexedSeq[Int] = (1 to  100)
      .map(idx => random.nextInt(1000))

    val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1))
    rdd1.glom.collect.foreach(x => println(x.toBuffer))

    println("=========================================")

    val rdd2 = rdd1.partitionBy(new MyPartitioner(10))
    rdd2.glom.collect().foreach(x => println(x.toBuffer))
    
    sc.stop()
    
  }

}

打包上传

这里之前已经重复过多次,就跳过了

mvn clean package

运行测试

spark-submit --master local[*] --class icu.wzk.UserDefinedPartitioner spark-wordcount

可以看到如下的运行结果:

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
28天前
|
存储 SQL 分布式计算
大数据散列分区映射到分区
大数据散列分区映射到分区
33 4
|
28天前
|
存储 负载均衡 算法
大数据散列分区计算哈希值
大数据散列分区计算哈希值
42 4
|
28天前
|
大数据 数据管理 定位技术
大数据散列分区选择分区键
大数据散列分区选择分区键
27 2
|
27天前
|
负载均衡 大数据
大数据散列分区查询频率
大数据散列分区查询频率
22 5
|
27天前
|
存储 大数据 数据处理
大数据散列分区数据分布
大数据散列分区数据分布
28 2
|
27天前
|
存储 负载均衡 监控
大数据散列分区数据分布
大数据散列分区数据分布
24 1
|
1月前
|
存储 大数据 数据管理
大数据分区简化数据维护
大数据分区简化数据维护
24 4
|
1月前
|
存储 负载均衡 NoSQL
大数据散列分区
大数据散列分区
27 2
|
1月前
|
存储 大数据 数据管理
大数据列表分区
大数据列表分区
32 1
|
1月前
|
存储 负载均衡 大数据
大数据范围分区
大数据范围分区
25 1