Spark RDD编程(Python和Scala版本)

简介: Spark中的RDD就是一个不可变的分布式对象集合,是一种具有兼容性的基于内存的集群计算抽象方法,Spark则是这个方法的抽象。Spark的RDD操作分为转化操作(transformation)和行动操作(action),两者的区别在于:       a.

Spark中的RDD就是一个不可变的分布式对象集合,是一种具有兼容性的基于内存的集群计算抽象方法,Spark则是这个方法的抽象。

Spark的RDD操作分为转化操作(transformation)和行动操作(action),两者的区别在于:

       a.转化操作返回一个新的RDD对象

       b.行动操作则会对RDD产生一个计算结果,并把结果返回到驱动器程序中或者把结果存储到外部存储系统(如HDFS)

常见的转化操作有:map,filter,flatMap,sample,union,distinct,

                                    groupByKey,reduceByKey,sortByKey,join,cogroup,cartesian  ......

常见的行动操作有:reduce,collect,count,first,take,taksSample,

                                    saveAsTextFile,saveAsSequenceFile,countByKey,foreach ......

下面我们以实例说明Saprk的RDD编程

1:创建RDD

      有两种方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化

       python:

>>> nums =sc.parallelize([1,2,3,4])
>>> nums
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
>>> words = sc.textFile("file:///usr/local/hadoop/spark/README.md")
>>> words
file:///usr/local/hadoop/spark/README.md MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2
>>> 
       Scala(两种方法):
val lines = sc.parallelize(List(1,2,3,4))
lines: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
val rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:27

2:map()函数 和 take()函数

      RDD.map(func),map接受一个函数作为参数,作用于RDD中的每个对象,并将返回结果作为结果RDD中对应的元素的值

      RDD.take(num),用于取回num个value,在这里结合map使用,方便查看值

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> for num in nums.take(4):
...     print num
... 
1
2
3
4
>>> new_nums = nums.map(lambda x: x*2)
>>> for new_num in new_nums.take(4):
...     print new_num
... 
2
4
6
8

Scala:

scala> val nums = sc.parallelize(List(1,2,3,4))
nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27

scala>nums.take(4).foreach(println)
1
2
3
4

3:flatMap()函数

      RDD.flatMap(func),和map类似,只不过map返回的是一个个元素,而flatMap返回的则是一个返回值序列的迭代器

Python:

>>> string = sc.parallelize(["i love you"])
>>> new_str = string.flatMap(lambda str:str.split(" "))</span>
>>> for str in new_str.take(3):
...     print str
... 
i
love
you

Scala:

scala> val string = sc.parallelize(List("i love you"))
string: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:27

scala> val new_str = string.flatMap(line=>line.split(" "))
new_str: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:29

scala> new_str.take(3).foreach(println)
i
love
you

4:filter()函数和 first()函数

      RDD.filter(func),接受一个函数作为参数,并将RDD中满足该函数的元素放入新的RDD中返回

      RDD.first(),返回的第一个

Python:

>>> string = sc.parallelize(["i love you"])
>>> new_str = string.filter(lambda line : "you" in line)
>>> new_str.first()
'i love you'

Scala:

scala> val string = sc.parallelize(List("i love you"))
string: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27
scala> string.first()
res3: String = i love you

scala> 
<pre name="code" class="java">scala> val string = sc.parallelize(List("I love you"))
scala> val new_str = string.filter(line =>line.contains("love"))
new_str: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:23

scala> new_str.foreach(println)
I love you

 

5:union()函数

      RDD1.union(RDD2),操作对象为两个RDD,返回一个新的RDD,转化操作可以操作任意数量的输入RDD

Python:

>>> num1 = sc.parallelize([1,2,3])
>>> num2 = sc.parallelize([4,5,6])
>>> num3 = num1.union(num2)
>>> for num in num3.take(6):
...     print num
... 
1
2
3
4
5
6

Scala:

scala> val num1 = sc.parallelize(List(1,2,3))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27

scala> val num3 = num1.union(num2)
mum3: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:31

scala> num3.count()
res1: Long = 6                                                                  

scala> num3.foreach(println)
3
1
2
4
5
6

6:count()函数和collect()函数

      RDD.count(),是统计RDD中元素的个数,返回的是一个整数

      EDD.collect(),用来收集数据,保存在一个新的数据结构中,用来持久化,需要注意的是collect不能用在大规模数据集上

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> nums.count()
[Stage 0:>                                                          (0 +[Stage 0:>                                                          (0 +[Stage 0:==============>  
 (1 +
 4       
>>> 
>>> new_nums = nums.collect()
>>> new_nums
[1, 2, 3, 4]
>>> 

Scala:

scala> val num1 = sc.parallelize(List(1,2,3))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27

scala> num1.count()
res3: Long = 3

scala> val num2=num1.collect()
num2: Array[Int] = Array(1, 2, 3)

scala> num2
res4: Array[Int] = Array(1, 2, 3)

scala> 

7:伪集合操作

(1):RDD.distinct,去重,但其操作的开销大,因为它需要所有数据通过网络进行混洗

Python:
>>> nums1 = sc.parallelize([1,2,3,3])
>>> nums1.count()
4
>>> nums2=nums1.distinct()
>>> nums2.count()
3
>>>
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,3))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27

scala> val num2 = num1.distinct()
num2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at distinct at <console>:29

scala> num2.foreach(println)
2
3
1

(2):RDD1.intersection(RDD2),返回两个RDD中都有的元素,类似于集合中的交集

Python:
>>> nums_1=sc.parallelize([1,2,3,4,5])
>>> nums_2=sc.parallelize([3,4,5,6,7])
>>> nums_3=nums_1.intersection(nums_2)
>>> nums_3.count()
[Stage 7:>                                                          (0 +                                                                        3       
>>> for num in nums_3.take(3):
...     print num
...
3
4
5
>>>
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(3,4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> val num3 = num1.intersection(num2)
num3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at intersection at <console>:31

scala> num3.foreach(println)
4
3


(3):RDD1.subtract(RDD2),接受一个RDD作为参数,返回一个由只存在第一个RDD1而不存在与第二个RDD2中的所有元素组成的RDD

Python:
>>> nums_4 = nums_1.subtract(nums_2)
>>> nums_4.count()
2
>>> for num in nums_4.take(2):
...     print num
...
1
2
>>>
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(3,4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:27

scala> val num3 = num1.subtract(num2)
num3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:31

scala> num3.foreach(println)
2
1


(4):RDD1.cartesian(RDD2),求笛卡尔积,求出所有可能的(a,b)对

Python:
>>> nums_5 = nums_1.cartesian(nums_2)
>>> nums_5
org.apache.spark.api.java.JavaPairRDD@5617ade8
>>> nums_5.first()
(1, 3)
>>>

Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(3,4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:27

scala> val num3 = num1.cartesian(num2)
num3: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[25] at cartesian at <console>:31

scala> num3.foreach(println)
(1,3)
(1,5)
(1,6)
(1,4)
(2,3)
(2,4)
(3,3)
(2,5)
(2,6)
(3,4)
(3,6)
(4,3)
(3,5)
(4,5)
(4,4)
(4,6)

8:reduce()函数

  RDD.reduce(func),接受一个函数作为参数,操作两个RDD的元素类型的数据并返回一个同样类型的新元素

Python:

>>> nums=sc.parallelize([1,2,3,4,5,6])
>>> nums.reduce(lambda x,y:x+y)
21
>>> 
Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:27

scala> val num2 = num1.reduce((x,y)=>x+y)
num2: Int = 10

9:aggregate()函数

aggregate()函数需要我们提供期待返回的类型的初始值,然后通过一个函数把RDD中的元素合并起来放入累加器,考虑到每个节点是在本地累加的,最终,还需要通过第二个函数把累加器兩兩合并

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> sumCount = nums.aggregate( (0,0),
... (lambda acc,value:(acc[0]+value,acc[1]+1)),
... (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))
>>> sumCount[0]/float(sumCount[1])
2.5
>>>
Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at parallelize at <console>:27

scala> val result = num1.aggregate((0,0))(
     | (acc,value) => (acc._1 + value,acc._2+1),
     | (acc1,acc2) =>(acc1._1+acc2._1,acc1._2+acc2._2)
     | )
result: (Int, Int) = (10,4)

scala> val avg = result._1/result._2.toDouble
avg: Double = 2.5

scala> 

10:top()函数和 foreach()函数

        RDD.top(num),从RDD中返回前边的num个元素

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> new_nums = nums.top(3)
>>> new_nums
[4, 3, 2]
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:27

scala> num1.top(2)
res10: Array[Int] = Array(4, 3)

scala> 
        RDD.foreach(func),对RDD中的每个元素使用给定的函数

Python:

>>> nums = sc.parallelize([1,2,3])
>>> def add(x):
...     print "\n","x+2:",x+2
... 
>>> nums.foreach(add)

x+2: 5

x+2: 3

x+2: 4

Scala:

scala> def add(x:Int)={
     |  println (x+2)
     | }
add: (x: Int)Unit

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:27

scala> num1.foreach(add)
6
5
3
4

11:sample()函数 和 takeSample()函数

            sample(withReplacement,traction,[send]):对RDD采样以及是否转换

Python:

>>> nums = sc.parallelize([1,2,3,4,5,6,7])
>>> new_nums = nums.sample(False,0.5)
>>> new_nums
PythonRDD[106] at RDD at PythonRDD.scala:43
>>> new_nums.count()
5
>>> for n in new_nums.take(5):
...     print n
... 
1
3
5
6
7
Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:27

scala> val num2 = num1.sample(false,0.5)
num2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[38] at sample at <console>:29

scala> num2.foreach(println)
2
3
          RDD.takeSample( withReplacement,num,[send]),从RDD中返回任意一些元素

Python:

>>> nums = sc.parallelize([1,2,3,4,5,6,7])
>>> new_nums= nums.takeSample(False,5)
>>> new_nums
[5, 3, 4, 6, 7]

Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[39] at parallelize at <console>:27

scala> val num2 = num1.takeSample(false,2)
num2: Array[Int] = Array(3, 4)

12:persist  和 unpersist

        RDD.persist(),不带参数默认把数据以序列化的形式缓存在JVM的堆空间中

        RDD.unpersist(),手动把持久化的RDD从内存中删除

>>> nums = sc.parallelize([1,2,3,4,5,6,7])
>>> new_nums = nums.persist()
>>> new_nums
ParallelCollectionRDD[124] at parallelize at PythonRDD.scala:423
>>> new_nums.unpersist()
ParallelCollectionRDD[124] at parallelize at PythonRDD.scala:423
>>> 
Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> val num2 = num1.persist()
num2: num1.type = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> num2.foreach(println)
3
1
2
4

scala> num2.unpersist()
res17: num2.type = ParallelCollectionRDD[41] at parallelize at <console>:27


相关文章
|
16天前
|
人工智能 数据可视化 数据挖掘
探索Python编程:从基础到高级
在这篇文章中,我们将一起深入探索Python编程的世界。无论你是初学者还是有经验的程序员,都可以从中获得新的知识和技能。我们将从Python的基础语法开始,然后逐步过渡到更复杂的主题,如面向对象编程、异常处理和模块使用。最后,我们将通过一些实际的代码示例,来展示如何应用这些知识解决实际问题。让我们一起开启Python编程的旅程吧!
|
15天前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
3天前
|
Unix Linux 程序员
[oeasy]python053_学编程为什么从hello_world_开始
视频介绍了“Hello World”程序的由来及其在编程中的重要性。从贝尔实验室诞生的Unix系统和C语言说起,讲述了“Hello World”作为经典示例的起源和流传过程。文章还探讨了C语言对其他编程语言的影响,以及它在系统编程中的地位。最后总结了“Hello World”、print、小括号和双引号等编程概念的来源。
98 80
|
2天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
15 2
|
15天前
|
小程序 开发者 Python
探索Python编程:从基础到实战
本文将引导你走进Python编程的世界,从基础语法开始,逐步深入到实战项目。我们将一起探讨如何在编程中发挥创意,解决问题,并分享一些实用的技巧和心得。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供有价值的参考。让我们一起开启Python编程的探索之旅吧!
41 10
|
16天前
|
IDE 程序员 开发工具
Python编程入门:打造你的第一个程序
迈出编程的第一步,就像在未知的海洋中航行。本文是你启航的指南针,带你了解Python这门语言的魅力所在,并手把手教你构建第一个属于自己的程序。从安装环境到编写代码,我们将一步步走过这段旅程。准备好了吗?让我们开始吧!
|
15天前
|
人工智能 数据挖掘 开发者
探索Python编程之美:从基础到进阶
本文是一篇深入浅出的Python编程指南,旨在帮助初学者理解Python编程的核心概念,并引导他们逐步掌握更高级的技术。文章不仅涵盖了Python的基础语法,还深入探讨了面向对象编程、函数式编程等高级主题。通过丰富的代码示例和实践项目,读者将能够巩固所学知识,提升编程技能。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供有价值的参考和启示。让我们一起踏上Python编程的美妙旅程吧!
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
66 5
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
54 3
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
45 0
下一篇
DataWorks