一、实验目的
掌握 DStream各种常用转换操作。
掌握Spark Streaming join操作。
掌握DStream计算结果的保存。
二、实验内容
编写Spark Streaming流计算程序,完成以下要求:
1、对DStream进行各种转换操作。
2、对两个DStream进行join操作。
3、保存DStream计算结果。
三、实验原理
Spark Streaming提供了多种转换函数,用来对接收到的实时数据进行转换操作。常用的DStreams转换函数如下表所示:
函数名 | 作用 |
map(func): | 对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream; |
flatMap(func): | 与map相似,但是每个输入项可用被映射为0个或者多个输出项; |
filter(func): | 返回一个新的DStream,仅包含源DStream中满足函数func的项; |
repartition(numPartitions): | 通过创建更多或者更少的分区改变DStream的并行程度; |
union(otherStream): | 返回一个新的DStream,包含源DStream和其他DStream的元素; |
count(): | 统计源DStream中每个RDD的元素数量; |
reduce(func): | 利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream; |
countByValue(): | 应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数; |
reduceByKey(func, [numTasks]): | 当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来; |
join(otherStream, [numTasks]): | 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream; |
cogroup(otherStream, [numTasks]): | 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组; |
transform(func): | 通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。 |
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3
五、实验步骤
5.1 启动Spark集群和HDFS集群
1、在终端窗口下,输入如下命令,分别启动Spark集群和HDFS集群:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh
然后使用jps
命令查看,确保已经正确启动了Spark和HDFS集群。
2、启动spark-shell。在终端窗口中,键入以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)
1. $ spark-shell --master spark://localhost:7077
5.2 启动netcat服务器
在本实验中,我们使用Socket数据源来测试流程序中的各种DStream转换操作。所以,需要启动一个netcat服务器,作为本实验中流程序的Socket数据源。
另打开一个新的终端,在终端窗口中使用如下命令开启netcat服务器,服务端口为9999:
1. $ nc -lp 9999
启动以后,就等待用户输入数据。输入的数据会被Spark流程序实时读取并处理。目前暂时不需要输入任何内容。
5.3 DStreams的转换操作
要执行Spark流程序,首先需要创建StreamingContext对象。请切换回pyspark shell窗口执行以下操作。
1、map(func)
使用map()对数据进行处理,这里以把分割后的数据转化为List为例。切换回spark-shell窗口,在paster模式下,输入以下代码:
1. import org.apache.spark.streaming.Seconds 2. import org.apache.spark.streaming.StreamingContext 3. 4. // 创建StreamingContext Secondes 设置每5s对数据进行一次采集 5. val ssc = new StreamingContext(sc,Seconds(5)) 6. 7. // 连接TCP服务 localhost 9999 8. val lines = ssc.socketTextStream("localhost",9999) 9. 10. // 对每行数据进行分割(按空格分割) 11. // 分割后的数据会存在Array中 直接打印会数据一个Array对象,所有转为List输出 12. val words = lines.map(_.split(" ").toList) 13. 14. // 打印输出 15. words.print() 16. 17. // 开始任务 18. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
切换到启动TCP服务的终端窗口,输入如下内容:
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546508005000 ms —————————————————————- List(spark, java, python, java, python, scala)
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999 2、flatMap(func)
使用flatMap函数对数据进行分割后打印。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. 8. // 打印输出 9. words.print() 10. 11. // 开始任务 12. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546573705000 ms —————————————————————- spark java python java python scala
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999
3、filter(func):
通过filter函数过滤数据值为”java”的数据打印出来。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. val f = (name:String) => name == "java" 8. 9. val word = words.filter(f) 10. 11. // 打印输出 12. word.print() 13. 14. // 开始任务 15. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546573830000 ms —————————————————————- java java
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999
4、 union(otherStream)
union函数是对DStream进行合并,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. val union_DStream = words.union(words) 8. union_DStream.print() 9. 10. // 开始任务 11. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546586695000 ms· —————————————————————- spark java python java python scala spark java python java
上面案例是把输入的数据进行一次合并。
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999
5、count()
count函数是对DStream元素进行统计,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. 8. // 对words中的元素数据进行统计 9. val word_count = words.count() 10. 11. // 打印统计结果 12. word_count.print() 13. 14. // 开始任务 15. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546587435000 ms —————————————————————- 0 —————————————————————- Time: 1546587440000 ms —————————————————————- 6
可以看出,当没有数据时,统计数量为0。
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999
6、reduce(func)
reduce函数是对DStream数据进行聚合,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. 8. // 对DStream中的元素依次相加 9. val word = words.reduce((a, b) => a + b) 10. 11. // 打印统计结果 12. word.print() 13. 14. // 开始任务 15. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. java
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546589670000 ms —————————————————————- java
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999
7、countByValue()
countByValue函数是对DStream数据进行统计,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. 8. // 对DStream中的元素进行计数并一(K,V)键值对返回 9. val word = words.countByValue() 10. // 打印统计结果 11. word.print() 12. 13. // 开始任务 14. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546590505000 ms —————————————————————- (scala,1) (python,2) (java,2) (spark,1)
上面的是打印结果中 K 为元素值,V为元素出现的次数。
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999
8、reduceByKey(func)
reduceByKey函数是对DStream中的数据根据相同的K对V进行处理,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. 8. // 使用map对元组进行组合 组合为(K,V)的形式 9. val word = words.map((_, 1)) 10. 11. // 通过reduceByKey对相同K的元组进行V相加 12. val word_reduce = word.reduceByKey((a, b) => a + b) 13. 14. // 打印统计结果 15. word_reduce.print() 16. 17. // 开始任务 18. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1546593640000 ms —————————————————————- (scala,1) (python,2) (java,2) (spark,1)
reduceByKey函数多用于处理(K,V)数据的聚合
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
然后发现netcat服务关闭,需重启netcat服务
1. nc -lp 9999
9、join(otherStream):
join函数可以将两个DStream连接到一起。例如,有一个(K,V),以及另一个(K,W),将这两者进行join连接,返回的结果为(K,(V,W))。在spark-shell窗口下,继续键入以下代码:
1. val ssc = new StreamingContext(sc,Seconds(5)) 2. val lines = ssc.socketTextStream("localhost",9999) 3. 4. // 对每行数据进行分割(按空格分割) 5. // 分割后的数据会存在map中 6. val words = lines.flatMap(_.split(" ")) 7. 8. // 使用map对元组进行组合 组合为(K,V)的形式 9. val word = words.map((_, 1)) 10. 11. // 通过join对把两个DSstream加到一起 12. val word_join = word.join(word) 13. 14. // 打印统计结果 15. word_join.print() 16. 17. // 开始任务 18. ssc.start()
然后同时按下【Ctrl + D】,执行以上代码。
紧接着,切换到启动TCP服务的终端窗口,输入如下内容::
1. spark java python java python scala
切换回spark-shell窗口,查看程序输出窗口,打印结果如下:
—————————————————————- Time: 1547189050000 ms —————————————————————- (scala,(1,1)) (python,(1,1)) (python,(1,1)) (python,(1,1)) (python,(1,1)) (java,(1,1)) (java,(1,1)) (java,(1,1)) (java,(1,1)) (spark,(1,1))
停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:
1. ssc.stop(false)
六、 实验知识测试
无
七、实验拓展
无