一、词频统计实例
读文件
拍扁
二、键值对RDD
1.创建键值对RDD
①加载数据
scala> val lines =sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") scala>val pairRDD = lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
②并行数据
val list=List("Hadoop","Spark","Hive","Spark") val rdd=sc.parallelize(list) val pairRDD = rdd.map(word => (word,1))
2.键值对RDD转换操作
①reduceByKey(func)
使用func函数合并具有相同键的值
pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
②groupByKey()
对具有相同键的值进行分组
pairRDD.groupByKey()
两者的区别
groupByKey只做分组,
(key,value-list)不会进行汇总求和;
而reduceByKey更进一步,
value-list进行汇总求和。
实例
③map
将RDD中每一个元素依次取出遍历
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t=>(t._1,t._2.sum))
④keys
把Pair RDD中的key返回形成一个新的RDD
pairRDD.keys pairRDD,keys.foreach(println)
⑤values
把Pair RDD中的value返回形成一个新的RDD
用法和上面的一样
⑥sortByKey
返回一个根据键排序的RDD
默认是升序
⑦mapValues(func)
对键值对RDD中的每个value都应用一个函数,key不会发生变化。
⑧join
连接RDD中key相同的元素
实例
求月均销量
rdd,mapValues(x=>(x,1)). reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)). mapValues(x=>(x._1/x._2)). collect()
三、RDD数据读写
惰性机制,即使输入错误的语句也不会马上报错
写文件
textFile.saveAsTextFile(“目录”)
读文件
val textFile=sc.textFile(“目录”)
json字符串
四、文件排序
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object FileSort { def main(args: Array[String]{ val conf = new SparkConf().setAppName("FileSort") val sc = new SparkContext(conf) val dataFile = "file:///usr/local/spark/mycode/rdd/data" val lines = sc.textFile(dataFile,3) var index = 0 val result = lines.filter(_.trim(.length>0).map(n=>(n.trim.toInt,"")).partitionBy(newHashPartitioner(1)).sortByKeyo.map(t =>{ index += 1 (index,t._1) }) result.saveAsTextFile("file:///usr/local/spark/mycode/rdd/examples/result") } }
val lines = sc.textFile(dataFile,3)
val result = lines.filter(_.trim(.length>0)
过滤没有内容的行
.map(n=>(n.trim.toInt,“”))
partitionBy(newHashPartitioner(1))
把所有分区相关数据组成新的分区
.map(t =>{
index += 1
(index,t._1)
})
result.saveAsTextFile(“file:///usr/local/spark/mycode/rdd/examples/result”)
完善
Spark 4: RDD实例
一、词频统计实例
以下是一个词频统计的Spark RDD实例,它演示了如何使用Spark来统计文本中单词的频率。
from pyspark import SparkContext # 创建SparkContext sc = SparkContext("local", "Word Count Example") # 从文本文件加载数据创建RDD text_rdd = sc.textFile("file:///path/to/your/textfile.txt") # 切分文本并计算词频 word_counts = text_rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) # 打印词频结果 print(word_counts.collect())
二、键值对RDD
1. 创建键值对RDD
你可以使用map
操作将普通RDD转换为键值对RDD,如下所示:
rdd = sc.parallelize([(1, "apple"), (2, "banana"), (3, "cherry")]) key_value_rdd = rdd.map(lambda x: (x[0], x[1]))
2. 键值对RDD转换操作
a. reduceByKey(func)
reduceByKey
操作用于按键对值进行归约操作。它将具有相同键的值进行合并,以减少数据量。
key_value_rdd = sc.parallelize([(1, 2), (2, 3), (1, 4), (2, 5)]) result = key_value_rdd.reduceByKey(lambda a, b: a + b)
b. groupByKey()
groupByKey
操作用于按键对值进行分组。它返回一个键和值的列表,其中每个键都对应一个值的迭代器。
key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")]) result = key_value_rdd.groupByKey()
两者的区别
reduceByKey
会在每个键的值上执行归约操作,并返回每个键的单个值。groupByKey
返回每个键的值的迭代器,可能包含多个值。
c. map
map
操作用于对键值对RDD的每个元素应用一个函数。
key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")]) result = key_value_rdd.map(lambda x: (x[0], x[1].upper()))
d. keys
keys
操作返回键的RDD。
key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")]) keys_rdd = key_value_rdd.keys()
e. values
values
操作返回值的RDD。
key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "cherry")]) values_rdd = key_value_rdd.values()
f. sortByKey
sortByKey
操作按键对RDD中的键进行排序。
key_value_rdd = sc.parallelize([(3, "apple"), (1, "banana"), (2, "cherry")]) sorted_rdd = key_value_rdd.sortByKey()
g. mapValues(func)
mapValues
操作对键值对RDD的值应用函数。
key_value_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (3, "cherry")]) result = key_value_rdd.mapValues(lambda x: x.upper())
h. join
join
操作用于合并两个键值对RDD,基于它们的键进行合并。
rdd1 = sc.parallelize([(1, "apple"), (2, "banana"), (3, "cherry")]) rdd2 = sc.parallelize([(2, "red"), (3, "yellow"), (4, "green")]) result = rdd1.join(rdd2)
三、RDD数据读写
Spark支持从各种数据源中读取数据创建RDD,包括本地文件系统、HDFS、数据库、Kafka等。你可以使用textFile
、sequenceFile
、jdbc
等方法来加载数据。
四、文件排序
Spark提供了sortBy
操作,可以对RDD中的数据进行排序。你可以指定排序的字段和排序顺序。
rdd = sc.parallelize([3, 1, 2, 4, 5]) sorted_rdd = rdd.sortBy(lambda x: x, ascending=False)
这些是关于RDD的一些实例和操作,Spark的RDD提供了强大的数据处理功能,可以用于各种分布式计算任务。