DStreams的操作

简介: DStreams的操作

一、实验目的

掌握 DStream各种常用转换操作。

  掌握Spark Streaming join操作。

  掌握DStream计算结果的保存。

二、实验内容

编写Spark Streaming流计算程序,完成以下要求:

  1、对DStream进行各种转换操作。

  2、对两个DStream进行join操作。

  3、保存DStream计算结果。

三、实验原理

Spark Streaming提供了多种转换函数,用来对接收到的实时数据进行转换操作。常用的DStreams转换函数如下表所示:

函数名 作用
map(func): 对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
count(): 统计源DStream中每个RDD的元素数量;
reduce(func): 利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
countByValue(): 应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
reduceByKey(func, [numTasks]): 当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
join(otherStream, [numTasks]): 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
cogroup(otherStream, [numTasks]): 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
transform(func): 通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

5.1 启动Spark集群和HDFS集群

1、在终端窗口下,输入如下命令,分别启动Spark集群和HDFS集群:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

然后使用jps命令查看,确保已经正确启动了Spark和HDFS集群。

  2、启动spark-shell。在终端窗口中,键入以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)

1.  $ spark-shell --master spark://localhost:7077

5.2 启动netcat服务器

在本实验中,我们使用Socket数据源来测试流程序中的各种DStream转换操作。所以,需要启动一个netcat服务器,作为本实验中流程序的Socket数据源。

  另打开一个新的终端,在终端窗口中使用如下命令开启netcat服务器,服务端口为9999:

1.  $ nc -lp 9999

启动以后,就等待用户输入数据。输入的数据会被Spark流程序实时读取并处理。目前暂时不需要输入任何内容。

5.3 DStreams的转换操作

要执行Spark流程序,首先需要创建StreamingContext对象。请切换回pyspark shell窗口执行以下操作。

1、map(func)

使用map()对数据进行处理,这里以把分割后的数据转化为List为例。切换回spark-shell窗口,在paster模式下,输入以下代码:

1.  import org.apache.spark.streaming.Seconds
2.  import org.apache.spark.streaming.StreamingContext
3.       
4.  // 创建StreamingContext Secondes 设置每5s对数据进行一次采集
5.  val ssc = new StreamingContext(sc,Seconds(5))
6.       
7.  // 连接TCP服务 localhost 9999
8.  val lines = ssc.socketTextStream("localhost",9999)
9.  
10. // 对每行数据进行分割(按空格分割)
11. // 分割后的数据会存在Array中 直接打印会数据一个Array对象,所有转为List输出
12. val words = lines.map(_.split(" ").toList)
13.      
14. // 打印输出
15. words.print()
16. 
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  切换到启动TCP服务的终端窗口,输入如下内容:

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546508005000 ms
—————————————————————-
List(spark, java, python, java, python, scala)

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999
2、flatMap(func)

使用flatMap函数对数据进行分割后打印。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 打印输出
9.  words.print()
10.      
11. // 开始任务
12. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546573705000 ms
—————————————————————-
spark
java
python
java
python
scala

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

3、filter(func):

  通过filter函数过滤数据值为”java”的数据打印出来。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.  val f = (name:String) => name == "java"
8.       
9.  val word = words.filter(f)
10.      
11. // 打印输出
12. word.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546573830000 ms
—————————————————————-
java
java

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

4、 union(otherStream)

   union函数是对DStream进行合并,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.  val union_DStream = words.union(words)
8.  union_DStream.print()
9.       
10. // 开始任务
11. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546586695000 ms·
—————————————————————-
spark
java
python
java
python
scala
spark
java
python
java

上面案例是把输入的数据进行一次合并。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

5、count()

   count函数是对DStream元素进行统计,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对words中的元素数据进行统计
9.  val word_count = words.count()
10.      
11. // 打印统计结果
12. word_count.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546587435000 ms
—————————————————————-
0
—————————————————————-
Time: 1546587440000 ms
—————————————————————-
6

可以看出,当没有数据时,统计数量为0。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

6、reduce(func)

   reduce函数是对DStream数据进行聚合,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对DStream中的元素依次相加 
9.  val word = words.reduce((a, b) => a + b)
10.      
11. // 打印统计结果
12. word.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  java

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546589670000 ms
—————————————————————-
java

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

7、countByValue()

   countByValue函数是对DStream数据进行统计,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对DStream中的元素进行计数并一(K,V)键值对返回
9.  val word = words.countByValue()
10. // 打印统计结果
11. word.print()
12.      
13. // 开始任务
14. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546590505000 ms
—————————————————————-
(scala,1)
(python,2)
(java,2)
(spark,1)

上面的是打印结果中 K 为元素值,V为元素出现的次数。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

8、reduceByKey(func)

   reduceByKey函数是对DStream中的数据根据相同的K对V进行处理,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 使用map对元组进行组合 组合为(K,V)的形式
9.  val word = words.map((_, 1))
10.      
11. // 通过reduceByKey对相同K的元组进行V相加
12. val word_reduce = word.reduceByKey((a, b) => a + b)
13.      
14. // 打印统计结果
15. word_reduce.print()
16.      
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546593640000 ms
—————————————————————-
(scala,1)
(python,2)
(java,2)
(spark,1)

reduceByKey函数多用于处理(K,V)数据的聚合

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

9、join(otherStream):

  join函数可以将两个DStream连接到一起。例如,有一个(K,V),以及另一个(K,W),将这两者进行join连接,返回的结果为(K,(V,W))。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 使用map对元组进行组合 组合为(K,V)的形式
9.  val word = words.map((_, 1))
10.      
11. // 通过join对把两个DSstream加到一起
12. val word_join = word.join(word)
13.      
14. // 打印统计结果
15. word_join.print()
16.      
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1547189050000 ms
—————————————————————-
(scala,(1,1))
(python,(1,1))
(python,(1,1))
(python,(1,1))
(python,(1,1))
(java,(1,1))
(java,(1,1))
(java,(1,1))
(java,(1,1))
(spark,(1,1))

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

六、 实验知识测试

七、实验拓展


45c2008862974b75a7ea5cf76b5827db.png







相关文章
|
1月前
|
Java 程序员 API
Java循环操作哪个快?
本文探讨了Java中stream API与传统for循环在性能上的对比,通过多个示例分析了不同场景下两者的优劣。作者指出,尽管stream API使代码更简洁,但不当使用会降低可读性和性能,特别是在处理大数据量时。实验结果显示,在多数情况下,普通for循环的性能优于stream API,尤其是在单次操作耗时较短但需多次执行的场景中。文章建议开发者在设计初期就考虑全局流程,避免重复使用stream流,以提升代码质量和性能。
Java循环操作哪个快?
|
4月前
|
存储 缓存 Unix
victoriaMetrics中的一些Sao操作
victoriaMetrics中的一些Sao操作
36 1
|
C++
c++一些简单操作
c++一些简单操作
80 0
系统通信方式操作
系统通信方式操作
78 0
系统通信方式操作
|
索引 Windows
ListControl操作
ListControl操作
83 0
|
Linux
文件控制操作
文件控制操作
121 0
|
SQL Java 数据库连接
MyBatisCRUD操作
MyBatisCRUD操作
C#编程-90:Hashtale相关操作
C#编程-90:Hashtale相关操作
|
设计模式 JavaScript 前端开发
jQuery基础操作
jQuery基础操作
187 0
jQuery基础操作