[大数据之Spark]——Transformations转换入门经典实例

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用;另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系。本篇就着重描述下Spark提供的Transformations方法.依赖关系宽依赖和窄依赖窄依赖(narrow dependencies)窄依赖是指父RDD仅仅被一个子RDD所使用,子RDD的每个分区依赖于常数个父分区(O(1),与数据规模无关)。

Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用;另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系。

本篇就着重描述下Spark提供的Transformations方法.

依赖关系

宽依赖和窄依赖

窄依赖(narrow dependencies)

窄依赖是指父RDD仅仅被一个子RDD所使用,子RDD的每个分区依赖于常数个父分区(O(1),与数据规模无关)。

输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap

输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce

从输入中选择部分元素的算子,如filter、distinct、substract、sample

宽依赖(wide dependencies)

宽依赖是指父RDD被多个子分区使用,子RDD的每个分区依赖于所有的父RDD分区(O(n),与数据规模有关)

对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey

对两个RDD基于key进行join和重组,如join(父RDD不是hash-partitioned )

需要进行分区,如partitionBy

Transformations转换方法实例

map(func)

map用于遍历rdd中的每个元素,可以针对每个元素做操作处理:

scala> var data = sc.parallelize(1 to 9,3)//内容为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> data.map(x=>x*2).collect()//输出内容 Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

filter(func)

filter用于过滤元素信息,仅仅返回满足过滤条件的元素

scala> var data = sc.parallelize(1 to 9,3)//内容为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> data.filter(x=> x%2==0).collect()//输出内容 Array[Int] = Array(2, 4, 6, 8)

flatMap(func)

flatMap与map相比,不同的是可以输出多个结果,比如

scala> var data = sc.parallelize(1 to 4,1)//输出内容为 Array[Int] = Array(1, 2, 3, 4)scala> data.flatMap(x=> 1 to x).collect()//输出内容为 Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

mapPartitions(func)

mapPartitions与map类似,只不过每个元素都是一个分区的迭代器,因此内部可以针对分区为单位进行处理。

比如,针对每个分区做和

//首先创建三个分区scala> var data = sc.parallelize(1 to 9,3)//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)//查看分区的个数scala> data.partitions.size//输出为 Int = 3//使用mapPartitionsscala> var result = data.mapPartitions{ x=> { | var res = List[Int]() | var i = 0 | while(x.hasNext){ | i+=x.next() | } | res.::(i).iterator | }}scala> result.collect//输出为 Array[Int] = Array(6, 15, 24)

mapPartitionsWithIndex(func)

这个方法与上面的mapPartitions相同,只不过多提供了一个Index参数。

//首先创建三个分区scala> var data = sc.parallelize(1 to 9,3)//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)//查看分区的个数scala> data.partitions.size//输出为 Int = 3scala> var result = data.mapPartitionsWithIndex{ | (x,iter) => { | var result = List[String]() | var i = 0 | while(iter.hasNext){ | i += iter.next() | } | result.::( x + "|" +i).iterator | }} result.collect//输出结果为 Array[String] = Array(0|6, 1|15, 2|24)

sample(withReplacement, fraction, seed)

这个方法可以用于对数据进行采样,比如从1000个数据里面随机5个数据。

第一个参数withReplacement代表是否进行替换,如果选true,上面的例子中,会出现重复的数据

第二个参数fraction 表示随机的比例

第三个参数seed 表示随机的种子

//创建数据var data = sc.parallelize(1 to 1000,1)//采用固定的种子seed随机data.sample(false,0.005,0).collect//输出为 Array[Int] = Array(53, 423, 433, 523, 956, 990)//采用随机种子data.sample(false,0.005,scala.util.Random.nextInt(1000)).collect//输出为 Array[Int] = Array(136, 158)

union(otherDataset)

union方法可以合并两个数据集,但是不会去重,仅仅合并而已。

//创建第一个数据集scala> var data1 = sc.parallelize(1 to 5,1)//创建第二个数据集scala> var data2 = sc.parallelize(3 to 7,1)//取并集scala> data1.union(data2).collect//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)

intersection(otherDataset)

这个方法用于取两个数据集的交集

//创建第一个数据集scala> var data1 = sc.parallelize(1 to 5,1)//创建第二个数据集scala> var data2 = sc.parallelize(3 to 7,1)//取交集scala> data1.intersection(data2).collect//输出为 Array[Int] = Array(4, 3, 5)

distinct([numTasks]))

这个方法用于对本身的数据集进行去重处理。

//创建数据集scala> var data = sc.parallelize(List(1,1,1,2,2,3,4),1)//执行去重scala> data.distinct.collect//输出为 Array[Int] = Array(4, 1, 3, 2)//如果是键值对的数据,kv都相同,才算是相同的元素scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))//执行去重scala> data.distinct.collect//输出为 Array[(String, Int)] = Array((A,1), (B,1), (A,2))

groupByKey([numTasks])

这个方法属于宽依赖的方法,针对所有的kv进行分组,可以把相同的k的聚合起来。如果要想计算sum等操作,最好使用reduceByKey或者combineByKey

//创建数据集scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))//分组输出scala> data.groupByKey.collect//输出为 Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(1, 1, 2)))

reduceByKey(func, [numTasks])

这个方法用于根据key作分组计算,但是它跟reduce不同,它还是属于transfomation的方法。

//创建数据集scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))scala> data.reduceByKey((x,y) => x+y).collect//输出为 Array[(String, Int)] = Array((B,1), (A,4))

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

aggregateByKey比较复杂,我也不是很熟练,不过试验了下,大概的意思是针对分区内部使用seqOp方法,针对最后的结果使用combOp方法。

比如,想要统计分区内的最大值,然后再全部统计加和:

scala> var data = sc.parallelize(List((1,1),(1,2),(1,3),(2,4)),2)data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[54] at parallelize at :27scala> def sum(a:Int,b:Int):Int = { a+b }sum: (a: Int, b: Int)Intscala> data.aggregateByKey(0)(sum,sum).collectres42: Array[(Int, Int)] = Array((2,4), (1,6))scala> def max(a:Int,b:Int):Int = { math.max(a,b) }max: (a: Int, b: Int)Intscala> data.aggregateByKey(0)(max,sum).collectres44: Array[(Int, Int)] = Array((2,4), (1,5))

sortByKey([ascending], [numTasks])

sortByKey用于针对Key做排序,默认是按照升序排序。

//创建数据集scala> var data = sc.parallelize(List(("A",2),("B",2),("A",1),("B",1),("C",1)))data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at :27//对数据集按照key进行默认排序scala> data.sortByKey().collectres23: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))//升序排序scala> data.sortByKey(true).collectres24: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))//降序排序scala> data.sortByKey(false).collectres25: Array[(String, Int)] = Array((C,1), (B,2), (B,1), (A,2), (A,1))

join(otherDataset, [numTasks])

join方法为(K,V)和(K,W)的数据集调用,返回相同的K,所组成的数据集。相当于sql中的按照key做连接。

有点类似于 select a.value,b.value from a inner join b on a.key = b.key;

举个例子

//创建第一个数据集scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))//创建第二个数据集scala> var data2 = sc.parallelize(List(("A",4)))//创建第三个数据集scala> var data3 = sc.parallelize(List(("A",4),("A",5)))data1.join(data2).collect//输出为 Array[(String, (Int, Int))] = Array((A,(1,4)))data1.join(data3).collect//输出为 Array[(String, (Int, Int))] = Array((A,(1,4)), (A,(1,5)))

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, (Seq[V], Seq[W]))元组的数据集。

//创建第一个数据集scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))//创建第二个数据集scala> var data2 = sc.parallelize(List(("A",4)))//创建第三个数据集scala> var data3 = sc.parallelize(List(("A",4),("A",5)))scala> data1.cogroup(data2).collect//输出为 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4))), (C,(CompactBuffer(3),CompactBuffer())))scala> data1.cogroup(data3).collect//输出为 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4, 5))), (C,(CompactBuffer(3),CompactBuffer())))

cartesian(otherDataset)

这个方法用于计算两个(K,V)数据集之间的笛卡尔积

//创建第一个数据集scala> var a = sc.parallelize(List(1,2))//创建第二个数据集scala> var b = sc.parallelize(List("A","B"))//计算笛卡尔积scala> a.cartesian(b).collect//输出结果 res2: Array[(Int, String)] = Array((1,A), (1,B), (2,A), (2,B))

pipe(command, [envVars])

pipe方法用于针对每个分区的RDD执行一个shell脚本命令,可以使perl或者bash。分区的元素将会被当做输入,脚本的输出则被当做返回的RDD值。

//创建数据集scala> var data = sc.parallelize(1 to 9,3)data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at :27//测试脚本scala> data.pipe("head -n 1").collectres26: Array[String] = Array(1, 4, 7)scala> data.pipe("tail -n 1").collectres27: Array[String] = Array(3, 6, 9)scala> data.pipe("tail -n 2").collectres28: Array[String] = Array(2, 3, 5, 6, 8, 9)

coalesce(numPartitions)

这个方法用于对RDD进行重新分区,第一个参数是分区的数量,第二个参数是是否进行shuffle

//创建数据集scala> var data = sc.parallelize(1 to 9,3)data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27//查看分区的大小scala> data.partitions.sizeres3: Int = 3//不使用shuffle重新分区scala> var result = data.coalesce(2,false)result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[19] at coalesce at :29scala> result.partitions.lengthres12: Int = 2scala> result.toDebugStringres13: String = (2) CoalescedRDD[19] at coalesce at :29 [] | ParallelCollectionRDD[9] at parallelize at :27 []//使用shuffle重新分区scala> var result = data.coalesce(2,true)result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at coalesce at :29scala> result.partitions.lengthres14: Int = 2scala> result.toDebugStringres15: String = (2) MapPartitionsRDD[23] at coalesce at :29 [] | CoalescedRDD[22] at coalesce at :29 [] | ShuffledRDD[21] at coalesce at :29 [] +-(3) MapPartitionsRDD[20] at coalesce at :29 [] | ParallelCollectionRDD[9] at parallelize at :27 []

repartition(numPartitions)

这个方法作用于coalesce一样,重新对RDD进行分区,相当于shuffle版的calesce

//创建数据集scala> var data = sc.parallelize(1 to 9,3)data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27//查看分区的大小scala> data.partitions.sizeres3: Int = 3scala> var result = data.repartition(2)result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at repartition at :29scala> result.partitions.lengthres16: Int = 2scala> result.toDebugStringres17: String = (2) MapPartitionsRDD[27] at repartition at :29 [] | CoalescedRDD[26] at repartition at :29 [] | ShuffledRDD[25] at repartition at :29 [] +-(3) MapPartitionsRDD[24] at repartition at :29 [] | ParallelCollectionRDD[9] at parallelize at :27 []scala>

repartitionAndSortWithinPartitions(partitioner)

这个方法是在分区中按照key进行排序,这种方式比先分区再sort更高效,因为相当于在shuffle阶段就进行排序。

下面的例子中,由于看不到分区里面的数据。可以通过设置分区个数为1,看到排序的效果。

scala> var data = sc.parallelize(List((1,2),(1,1),(2,3),(2,1),(1,4),(3,5)),2)data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[60] at parallelize at :27scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(2)).collectres52: Array[(Int, Int)] = Array((2,3), (2,1), (1,2), (1,1), (1,4), (3,5))scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(1)).collectres53: Array[(Int, Int)] = Array((1,2), (1,1), (1,4), (2,3), (2,1), (3,5))scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(3)).collectres54: Array[(Int, Int)] = Array((3,5), (1,2), (1,1), (1,4), (2,3), (2,1))

参考

spark 官方文档

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
28天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
82 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
29天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
51 6
|
27天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
86 2
|
28天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
65 1
|
28天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
29天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
54 1
|
2月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
15天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
124 7
|
15天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
31 2
|
28天前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
74 1