Spark Streaming开发基础

简介: Spark Streaming开发基础

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

Spark Streaming开发基础

3. 实验学时:

4. 实验原理:

Spark Core它的核心就是RDD,对于Spark Streaming来说,它的核心是DStream,DStream类似于RDD,它实质上一系列的RDD的集合,DStream可以按照秒数将数据流进行批量的划分。首先从接收到流数据之后,将其划分为多个batch,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS或者数据库以及前端页面展示等等。

821acfd62b4d4b858291460117ac341e.png

图2 Spark Streaming原理

  Spark Streaming有以下特点

  1、高可扩展性,可以运行在上百台机器上。

  2、低延迟,可以在秒级别上对数据进行处理。

  3、高可容错性

5. 实验目的:

掌握套接字数据源连接下的Streaming应用编程模式。

  掌握文件数据源连接下的Streaming应用编程模式。

6. 实验内容:

使用Spark Streaming处理实时数据流,掌握SparkStreaming的两种实时数据处理:

  1、使用Spark Streaming直接从一个TCP/IP socket接收数据。

   - 通过nc服务监控一个端口,可以不断发出数据。

   - Spark通Spark Streaming实时获取数据,并对数据进行处理。

   - 使用socketTextStream监控nc服务端口将,将获取的数据转换为 DStream。

  2、使用Spark Streaming获取实时文件数据。

   - 通过不断上传数据文件到HDFS,模拟数据文件的产生。

   - 通过Spark Streaming实时监控HDFS目录,当有新数据文件产生时对数据文件进行处理。

   - 使用textFileStream将新的数据文件转换为 DStream。

7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

8. 实验步骤:

8.1 启动Spark集群和HDFS集群

1、启动Spark集群和HDFS集群。在终端窗口下,输入如下命令:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

启动之后,请使用jps命令查看,确保Spark集群和HDFS集群均已经正确启动。

  2、在HDFS中,创建Spark流程序要监听的流文件目录。在终端窗口下,输入如下命令:

1.  $ hdfs dfs -mkdir -p /data/dataset/streaming

3、启动spark-shell,用来执行spark代码,把代码提交到集群上。在终端窗口中,键入以下命令(注意:在执行以下命令时,请把localhost替换为实际的机器名):

1.  $ spark-shell --master spark://localhost:7077

8.2 使用Spark Streaming直接从一个TCP/IP socket接收数据

1、 从Spark streaming创建StreamingContext,使用socketTextStream监听TCP。请在spark-shell中,使用paster模式,编写如下代码:

1.  import org.apache.spark.streaming.Seconds
2.  import org.apache.spark.streaming.StreamingContext
3.       
4.  // 创建StreamingContext Secondes 设置每5s对数据进行一次采集
5.  val ssc = new StreamingContext(sc,Seconds(5))
6.       
7.  // 连接TCP服务 localhost 9999
8.  val lines = ssc.socketTextStream("localhost",9999)

然后同时按下【Ctrl + D】键,执行以上代码。

  2、处理从Socket服务器收到的实时数据。进入paste模式,然后编辑如下数据处理代码:

1.  // 对每行数据进行分割(按空格分割)
2.  val words = lines.flatMap(_.split(" "))
3.       
4.  // 把单词和1组合返回数组
5.  val word = words.map((_,1))
6.       
7.  // 对单词进行聚合
8.  val wordCounts = word.reduceByKey(_+_)
9.       
10. // 打印输出
11. wordCounts.print()
12.      
13. // 开始任务
14. ssc.start()
15.      
16. // 等待任务停止命令
17. ssc.awaitTermination()

注:这里先不要执行。需要Netcat服务启动以后,再来执行这部分流处理代码

  3、另打开一个终端,在该终端使窗口中,键入以下命令,用nc命令开启监听TCP端口,指定服务端口为9999:

1.  $ nc -lp 9999

参数说明:

• -l 使用监听模式,管控传入的资料。

• -p <通信端口> 设置本地主机使用的通信端口。

 4、切换到spark-shell窗口,同时按下”Ctrl + D”键,执行步骤(2)中键入的代码。

 5、切换到netcat的终端窗口下,随便输入一些语句,语句中的单词之间以空格分隔,如下所示:

1.  spark java python java python scala

6、切换到spark-shell窗口下,查看Spark流处理程序执行的输出窗口,可以看到类似如下的输出结果:

—————————————————————-
Time: 1545902810000 ms
—————————————————————-
(java,2)
(scala,1)
(python,2)
(spark,1)

7、从spark-shell停止运算流。在spark-shell窗口下,输入以下命令,停止流运算:

1.  ssc.stop(false)

其中参数false告诉streaming context不要停止该Spark context(不能重新启动已经停止的streaming context,但是可以重用已经存在的Spark context来创建一个新的streaming context。)或者,按下【Ctrl + C】键终止。

  8、退出spark-shell。在spark-shell中,执行以下命令:

1.  :quit

9、关闭natcat服务器。在netcat的终端窗口下,同时按下 【Ctrl + C】,即可停止natcat服务器。

8.3 使用Spark Streaming获取实时文件数据

1、在目录下/data/dataset/下有一个数据文件language.txt。打开一个终端窗口,执行如下命令,查看该文件中的文本内容:

1.   cat /data/dataset/streaming/language.txt

可以看到,该文件中的内容如下所示:

python java
scala hadoop
hive java
scala hadoop
python hive
python scala
java hadoop
mysql python

2、另打开一个终端,启动spark-shell。在终端窗口中,键入以下命令(注意:在执行以下命令时,请把localhost替换为实际的机器名):

1.  spark-shell --master spark://localhost:7077

3、在spark-shell窗口下,使用paste模式,输入以下Spark流处理代码:

1.  import org.apache.spark.streaming.Seconds
2.  import org.apache.spark.streaming.StreamingContext
3.       
4.  // 创建StreamingContext Secondes 设置每5s对数据进行一次采集
5.  val ssc = new StreamingContext(sc,Seconds(5))
6.       
7.  // 使用textFileStream监听hdfs上的 /data/dataset/streaming目录
8.  val lines = ssc.textFileStream("hdfs://localhost:9000/data/dataset/streaming")
9.       
10. // 对每行数据进行分割(按空格分割)
11. val words = lines.flatMap(_.split(" "))
12.      
13. // 把单词和1组合返回数组
14. val word = words.map((_,1))
15.      
16. // 对单词进行聚合
17. val wordCounts = word.reduceByKey(_+_)
18.      
19. // 打印输出
20. wordCounts.print()
21.      
22. // 开始任务
23. ssc.start()
24.      
25. // 等待任务停止命令
26. ssc.awaitTermination()

在以上流处理代码中,我们监听的是”/data/dataset/streaming”目录。

  然后,同时按下【Ctrl + D】键,执行以上代码。

4、切换到第一个终端窗口,将”/data/dataset/streaming/language.txt”文件上传到HDFS中。在该终端窗口中,执行如下命令:

1.   hdfs dfs -put /data/dataset/streaming/language.txt /data/dataset/streaming/

5、切换到spark-shell窗口下,可以看到如下的计算结果:

—————————————————————-
Time: 1546073050000 ms
—————————————————————-
(hive,2)
(mysql,1)
(java,3)
(hadoop,3)
(scala,3)
(python,4)

9. 实验结果及分析:

实验结果运行准确,无误

10. 实验结论:

经过本节实验的学习,通过学习Spark Streaming开发基础,进一步巩固了我们的Spark基础。

11. 总结及心得体会:

Spark Streaming特点:

  1、高可扩展性,可以运行在上百台机器上。

  2、低延迟,可以在秒级别上对数据进行处理。

  3、高可容错性

d0d86ac437264ad2bdecd48b77027b21.png



相关文章
|
7月前
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
48 0
|
1月前
|
分布式计算 大数据 Apache
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
81 0
|
11天前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
22 4
|
1月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
57 2
|
1月前
|
存储 消息中间件 分布式计算
Spark Streaming
Spark Streaming
33 1
|
29天前
|
SQL 分布式计算 大数据
Spark开发实用技巧-从入门到爱不释手
Spark开发实用技巧-从入门到爱不释手
23 0
|
1月前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
125 0
|
1月前
|
分布式计算 监控 数据处理
Spark Streaming的容错性与高可用性
Spark Streaming的容错性与高可用性
|
1月前
|
分布式计算 数据处理 Apache
Spark Streaming与数据源连接:Kinesis、Flume等
Spark Streaming与数据源连接:Kinesis、Flume等