Spark4:RDD实例

简介: Spark4:RDD实例

一、词频统计实例

读文件

拍扁

二、键值对RDD

1.创建键值对RDD

①加载数据

scala> val lines =sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala>val pairRDD = lines.flatMap(line=>line.split(" ")).map(word=>(word,1))

②并行数据

val list=List("Hadoop","Spark","Hive","Spark")
val rdd=sc.parallelize(list)
val pairRDD = rdd.map(word => (word,1))

2.键值对RDD转换操作

①reduceByKey(func)

使用func函数合并具有相同键的值

pairRDD.reduceByKey((a,b)=>a+b).foreach(println)

②groupByKey()

对具有相同键的值进行分组

pairRDD.groupByKey()

两者的区别

groupByKey只做分组,

(key,value-list)不会进行汇总求和;

而reduceByKey更进一步,

value-list进行汇总求和。

实例

③map

将RDD中每一个元素依次取出遍历

val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t=>(t._1,t._2.sum))

④keys

把Pair RDD中的key返回形成一个新的RDD

pairRDD.keys
pairRDD,keys.foreach(println)

⑤values

把Pair RDD中的value返回形成一个新的RDD

用法和上面的一样

⑥sortByKey

返回一个根据键排序的RDD

默认是升序

⑦mapValues(func)

对键值对RDD中的每个value都应用一个函数,key不会发生变化。

⑧join

连接RDD中key相同的元素

实例

求月均销量

rdd,mapValues(x=>(x,1)).
reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).
mapValues(x=>(x._1/x._2)).
collect()

三、RDD数据读写

惰性机制,即使输入错误的语句也不会马上报错

写文件

textFile.saveAsTextFile(“目录”)

读文件

val textFile=sc.textFile(“目录”)

json字符串

四、文件排序

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object FileSort {
  def main(args: Array[String]{
    val conf = new SparkConf().setAppName("FileSort")
    val sc = new SparkContext(conf)
    val dataFile = "file:///usr/local/spark/mycode/rdd/data"
    val lines = sc.textFile(dataFile,3)
    var index = 0
    val result = lines.filter(_.trim(.length>0).map(n=>(n.trim.toInt,"")).partitionBy(newHashPartitioner(1)).sortByKeyo.map(t =>{
         index += 1
       (index,t._1)
     })
     result.saveAsTextFile("file:///usr/local/spark/mycode/rdd/examples/result")
   }
}

val lines = sc.textFile(dataFile,3)

val result = lines.filter(_.trim(.length>0)

过滤没有内容的行

.map(n=>(n.trim.toInt,“”))

partitionBy(newHashPartitioner(1))

把所有分区相关数据组成新的分区

.map(t =>{

index += 1

(index,t._1)

})

result.saveAsTextFile(“file:///usr/local/spark/mycode/rdd/examples/result”)

完善

Spark 4: RDD实例

一、词频统计实例

以下是一个词频统计的Spark RDD实例,它演示了如何使用Spark来统计文本中单词的频率。

from pyspark import SparkContext
# 创建SparkContext
sc = SparkContext("local", "Word Count Example")
# 从文本文件加载数据创建RDD
text_rdd = sc.textFile("file:///path/to/your/textfile.txt")
# 切分文本并计算词频
word_counts = text_rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# 打印词频结果
print(word_counts.collect())

二、键值对RDD

1. 创建键值对RDD

你可以使用map操作将普通RDD转换为键值对RDD,如下所示:

rdd = sc.parallelize([(1, "apple"), (2, "banana"), (3, "cherry")])
key_value_rdd = rdd.map(lambda x: (x[0], x[1]))

2. 键值对RDD转换操作

a. reduceByKey(func)

reduceByKey操作用于按键对值进行归约操作。它将具有相同键的值进行合并,以减少数据量。

key_value_rdd = sc.parallelize([(1, 2), (2, 3), (1, 4), (2, 5)])
result = key_value_rdd.reduceByKey(lambda a, b: a + b)
b. groupByKey()

groupByKey操作用于按键对值进行分组。它返回一个键和值的列表,其中每个键都对应一个值的迭代器。

key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")])
result = key_value_rdd.groupByKey()
两者的区别
  • reduceByKey会在每个键的值上执行归约操作,并返回每个键的单个值。
  • groupByKey返回每个键的值的迭代器,可能包含多个值。
c. map

map操作用于对键值对RDD的每个元素应用一个函数。

key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")])
result = key_value_rdd.map(lambda x: (x[0], x[1].upper()))
d. keys

keys操作返回键的RDD。

key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")])
keys_rdd = key_value_rdd.keys()
e. values

values操作返回值的RDD。

key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")])
values_rdd = key_value_rdd.values()
f. sortByKey

sortByKey操作按键对RDD中的键进行排序。

key_value_rdd = sc.parallelize([(3, "apple"), (1, "banana"), (2, "cherry")])
sorted_rdd = key_value_rdd.sortByKey()
g. mapValues(func)

mapValues操作对键值对RDD的值应用函数。

key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (3, "cherry")])
result = key_value_rdd.mapValues(lambda x: x.upper())
h. join

join操作用于合并两个键值对RDD,基于它们的键进行合并。

rdd1 = sc.parallelize([(1, "apple"), (2, "banana"), (3, "cherry")])
rdd2 = sc.parallelize([(2, "red"), (3, "yellow"), (4, "green")])
result = rdd1.join(rdd2)

三、RDD数据读写

Spark支持从各种数据源中读取数据创建RDD,包括本地文件系统、HDFS、数据库、Kafka等。你可以使用textFilesequenceFilejdbc等方法来加载数据。

四、文件排序

Spark提供了sortBy操作,可以对RDD中的数据进行排序。你可以指定排序的字段和排序顺序。

rdd = sc.parallelize([3, 1, 2, 4, 5])
sorted_rdd = rdd.sortBy(lambda x: x, ascending=False)

这些是关于RDD的一些实例和操作,Spark的RDD提供了强大的数据处理功能,可以用于各种分布式计算任务。

目录
相关文章
|
2月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
119 1
|
2月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
11天前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
16 0
|
11天前
|
分布式计算 Hadoop Scala
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
13 0
|
2月前
|
分布式计算 Shell 开发工具
Spark编程实验二:RDD编程初级实践
Spark编程实验二:RDD编程初级实践
38 1
|
2月前
|
存储 分布式计算 程序员
Spark中的RDD介绍
Spark中的RDD介绍
22 0
|
2月前
|
分布式计算 Spark
Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
【2月更文挑战第14天】Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
48 1
|
2月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
76 1
|
2月前
|
存储 缓存 分布式计算
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
|
2月前
|
机器学习/深度学习 分布式计算 监控
典型的Spark应用实例
典型的Spark应用实例
77 1