1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
Spark Streaming开发基础
3. 实验学时:
4. 实验原理:
Spark Core它的核心就是RDD,对于Spark Streaming来说,它的核心是DStream,DStream类似于RDD,它实质上一系列的RDD的集合,DStream可以按照秒数将数据流进行批量的划分。首先从接收到流数据之后,将其划分为多个batch,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS或者数据库以及前端页面展示等等。
图2 Spark Streaming原理
Spark Streaming有以下特点:
1、高可扩展性,可以运行在上百台机器上。
2、低延迟,可以在秒级别上对数据进行处理。
3、高可容错性
5. 实验目的:
掌握套接字数据源连接下的Streaming应用编程模式。
掌握文件数据源连接下的Streaming应用编程模式。
6. 实验内容:
使用Spark Streaming处理实时数据流,掌握SparkStreaming的两种实时数据处理:
1、使用Spark Streaming直接从一个TCP/IP socket接收数据。
- 通过nc服务监控一个端口,可以不断发出数据。
- Spark通Spark Streaming实时获取数据,并对数据进行处理。
- 使用socketTextStream监控nc服务端口将,将获取的数据转换为 DStream。
2、使用Spark Streaming获取实时文件数据。
- 通过不断上传数据文件到HDFS,模拟数据文件的产生。
- 通过Spark Streaming实时监控HDFS目录,当有新数据文件产生时对数据文件进行处理。
- 使用textFileStream将新的数据文件转换为 DStream。
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3
8. 实验步骤:
8.1 启动Spark集群和HDFS集群
1、启动Spark集群和HDFS集群。在终端窗口下,输入如下命令:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh
启动之后,请使用jps命令查看,确保Spark集群和HDFS集群均已经正确启动。
2、在HDFS中,创建Spark流程序要监听的流文件目录。在终端窗口下,输入如下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/streaming
3、启动spark-shell,用来执行spark代码,把代码提交到集群上。在终端窗口中,键入以下命令(注意:在执行以下命令时,请把localhost替换为实际的机器名):
1. $ spark-shell --master spark://localhost:7077
8.2 使用Spark Streaming直接从一个TCP/IP socket接收数据
1、 从Spark streaming创建StreamingContext,使用socketTextStream监听TCP。请在spark-shell中,使用paster模式,编写如下代码:
1. import org.apache.spark.streaming.Seconds 2. import org.apache.spark.streaming.StreamingContext 3. 4. // 创建StreamingContext Secondes 设置每5s对数据进行一次采集 5. val ssc = new StreamingContext(sc,Seconds(5)) 6. 7. // 连接TCP服务 localhost 9999 8. val lines = ssc.socketTextStream("localhost",9999)
然后同时按下【Ctrl + D】键,执行以上代码。
2、处理从Socket服务器收到的实时数据。进入paste模式,然后编辑如下数据处理代码:
1. // 对每行数据进行分割(按空格分割) 2. val words = lines.flatMap(_.split(" ")) 3. 4. // 把单词和1组合返回数组 5. val word = words.map((_,1)) 6. 7. // 对单词进行聚合 8. val wordCounts = word.reduceByKey(_+_) 9. 10. // 打印输出 11. wordCounts.print() 12. 13. // 开始任务 14. ssc.start() 15. 16. // 等待任务停止命令 17. ssc.awaitTermination()
注:这里先不要执行。需要Netcat服务启动以后,再来执行这部分流处理代码。
3、另打开一个终端,在该终端使窗口中,键入以下命令,用nc命令开启监听TCP端口,指定服务端口为9999:
1. $ nc -lp 9999
参数说明:
• -l 使用监听模式,管控传入的资料。
• -p <通信端口> 设置本地主机使用的通信端口。
4、切换到spark-shell窗口,同时按下”Ctrl + D”键,执行步骤(2)中键入的代码。
5、切换到netcat的终端窗口下,随便输入一些语句,语句中的单词之间以空格分隔,如下所示:
1. spark java python java python scala
6、切换到spark-shell窗口下,查看Spark流处理程序执行的输出窗口,可以看到类似如下的输出结果:
—————————————————————- Time: 1545902810000 ms —————————————————————- (java,2) (scala,1) (python,2) (spark,1)
7、从spark-shell停止运算流。在spark-shell窗口下,输入以下命令,停止流运算:
1. ssc.stop(false)
其中参数false告诉streaming context不要停止该Spark context(不能重新启动已经停止的streaming context,但是可以重用已经存在的Spark context来创建一个新的streaming context。)或者,按下【Ctrl + C】键终止。
8、退出spark-shell。在spark-shell中,执行以下命令:
1. :quit
9、关闭natcat服务器。在netcat的终端窗口下,同时按下 【Ctrl + C】,即可停止natcat服务器。
8.3 使用Spark Streaming获取实时文件数据
1、在目录下/data/dataset/下有一个数据文件language.txt。打开一个终端窗口,执行如下命令,查看该文件中的文本内容:
1. cat /data/dataset/streaming/language.txt
可以看到,该文件中的内容如下所示:
python java scala hadoop hive java scala hadoop python hive python scala java hadoop mysql python
2、另打开一个终端,启动spark-shell。在终端窗口中,键入以下命令(注意:在执行以下命令时,请把localhost替换为实际的机器名):
1. spark-shell --master spark://localhost:7077
3、在spark-shell窗口下,使用paste模式,输入以下Spark流处理代码:
1. import org.apache.spark.streaming.Seconds 2. import org.apache.spark.streaming.StreamingContext 3. 4. // 创建StreamingContext Secondes 设置每5s对数据进行一次采集 5. val ssc = new StreamingContext(sc,Seconds(5)) 6. 7. // 使用textFileStream监听hdfs上的 /data/dataset/streaming目录 8. val lines = ssc.textFileStream("hdfs://localhost:9000/data/dataset/streaming") 9. 10. // 对每行数据进行分割(按空格分割) 11. val words = lines.flatMap(_.split(" ")) 12. 13. // 把单词和1组合返回数组 14. val word = words.map((_,1)) 15. 16. // 对单词进行聚合 17. val wordCounts = word.reduceByKey(_+_) 18. 19. // 打印输出 20. wordCounts.print() 21. 22. // 开始任务 23. ssc.start() 24. 25. // 等待任务停止命令 26. ssc.awaitTermination()
在以上流处理代码中,我们监听的是”/data/dataset/streaming”目录。
然后,同时按下【Ctrl + D】键,执行以上代码。
4、切换到第一个终端窗口,将”/data/dataset/streaming/language.txt”文件上传到HDFS中。在该终端窗口中,执行如下命令:
1. hdfs dfs -put /data/dataset/streaming/language.txt /data/dataset/streaming/
5、切换到spark-shell窗口下,可以看到如下的计算结果:
—————————————————————- Time: 1546073050000 ms —————————————————————- (hive,2) (mysql,1) (java,3) (hadoop,3) (scala,3) (python,4)
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过学习Spark Streaming开发基础,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
Spark Streaming特点:
1、高可扩展性,可以运行在上百台机器上。
2、低延迟,可以在秒级别上对数据进行处理。
3、高可容错性