DStreams的操作

简介: DStreams的操作

一、实验目的

掌握 DStream各种常用转换操作。

  掌握Spark Streaming join操作。

  掌握DStream计算结果的保存。

二、实验内容

编写Spark Streaming流计算程序,完成以下要求:

  1、对DStream进行各种转换操作。

  2、对两个DStream进行join操作。

  3、保存DStream计算结果。

三、实验原理

Spark Streaming提供了多种转换函数,用来对接收到的实时数据进行转换操作。常用的DStreams转换函数如下表所示:

函数名 作用
map(func): 对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
count(): 统计源DStream中每个RDD的元素数量;
reduce(func): 利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
countByValue(): 应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
reduceByKey(func, [numTasks]): 当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
join(otherStream, [numTasks]): 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
cogroup(otherStream, [numTasks]): 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
transform(func): 通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

5.1 启动Spark集群和HDFS集群

1、在终端窗口下,输入如下命令,分别启动Spark集群和HDFS集群:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

然后使用jps命令查看,确保已经正确启动了Spark和HDFS集群。

  2、启动spark-shell。在终端窗口中,键入以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)

1.  $ spark-shell --master spark://localhost:7077

5.2 启动netcat服务器

在本实验中,我们使用Socket数据源来测试流程序中的各种DStream转换操作。所以,需要启动一个netcat服务器,作为本实验中流程序的Socket数据源。

  另打开一个新的终端,在终端窗口中使用如下命令开启netcat服务器,服务端口为9999:

1.  $ nc -lp 9999

启动以后,就等待用户输入数据。输入的数据会被Spark流程序实时读取并处理。目前暂时不需要输入任何内容。

5.3 DStreams的转换操作

要执行Spark流程序,首先需要创建StreamingContext对象。请切换回pyspark shell窗口执行以下操作。

1、map(func)

使用map()对数据进行处理,这里以把分割后的数据转化为List为例。切换回spark-shell窗口,在paster模式下,输入以下代码:

1.  import org.apache.spark.streaming.Seconds
2.  import org.apache.spark.streaming.StreamingContext
3.       
4.  // 创建StreamingContext Secondes 设置每5s对数据进行一次采集
5.  val ssc = new StreamingContext(sc,Seconds(5))
6.       
7.  // 连接TCP服务 localhost 9999
8.  val lines = ssc.socketTextStream("localhost",9999)
9.  
10. // 对每行数据进行分割(按空格分割)
11. // 分割后的数据会存在Array中 直接打印会数据一个Array对象,所有转为List输出
12. val words = lines.map(_.split(" ").toList)
13.      
14. // 打印输出
15. words.print()
16. 
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  切换到启动TCP服务的终端窗口,输入如下内容:

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546508005000 ms
—————————————————————-
List(spark, java, python, java, python, scala)

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999
2、flatMap(func)

使用flatMap函数对数据进行分割后打印。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 打印输出
9.  words.print()
10.      
11. // 开始任务
12. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546573705000 ms
—————————————————————-
spark
java
python
java
python
scala

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

3、filter(func):

  通过filter函数过滤数据值为”java”的数据打印出来。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.  val f = (name:String) => name == "java"
8.       
9.  val word = words.filter(f)
10.      
11. // 打印输出
12. word.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546573830000 ms
—————————————————————-
java
java

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

4、 union(otherStream)

   union函数是对DStream进行合并,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.  val union_DStream = words.union(words)
8.  union_DStream.print()
9.       
10. // 开始任务
11. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546586695000 ms·
—————————————————————-
spark
java
python
java
python
scala
spark
java
python
java

上面案例是把输入的数据进行一次合并。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

5、count()

   count函数是对DStream元素进行统计,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对words中的元素数据进行统计
9.  val word_count = words.count()
10.      
11. // 打印统计结果
12. word_count.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546587435000 ms
—————————————————————-
0
—————————————————————-
Time: 1546587440000 ms
—————————————————————-
6

可以看出,当没有数据时,统计数量为0。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

6、reduce(func)

   reduce函数是对DStream数据进行聚合,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对DStream中的元素依次相加 
9.  val word = words.reduce((a, b) => a + b)
10.      
11. // 打印统计结果
12. word.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  java

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546589670000 ms
—————————————————————-
java

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

7、countByValue()

   countByValue函数是对DStream数据进行统计,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对DStream中的元素进行计数并一(K,V)键值对返回
9.  val word = words.countByValue()
10. // 打印统计结果
11. word.print()
12.      
13. // 开始任务
14. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546590505000 ms
—————————————————————-
(scala,1)
(python,2)
(java,2)
(spark,1)

上面的是打印结果中 K 为元素值,V为元素出现的次数。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

8、reduceByKey(func)

   reduceByKey函数是对DStream中的数据根据相同的K对V进行处理,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 使用map对元组进行组合 组合为(K,V)的形式
9.  val word = words.map((_, 1))
10.      
11. // 通过reduceByKey对相同K的元组进行V相加
12. val word_reduce = word.reduceByKey((a, b) => a + b)
13.      
14. // 打印统计结果
15. word_reduce.print()
16.      
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546593640000 ms
—————————————————————-
(scala,1)
(python,2)
(java,2)
(spark,1)

reduceByKey函数多用于处理(K,V)数据的聚合

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

9、join(otherStream):

  join函数可以将两个DStream连接到一起。例如,有一个(K,V),以及另一个(K,W),将这两者进行join连接,返回的结果为(K,(V,W))。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 使用map对元组进行组合 组合为(K,V)的形式
9.  val word = words.map((_, 1))
10.      
11. // 通过join对把两个DSstream加到一起
12. val word_join = word.join(word)
13.      
14. // 打印统计结果
15. word_join.print()
16.      
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1547189050000 ms
—————————————————————-
(scala,(1,1))
(python,(1,1))
(python,(1,1))
(python,(1,1))
(python,(1,1))
(java,(1,1))
(java,(1,1))
(java,(1,1))
(java,(1,1))
(spark,(1,1))

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

六、 实验知识测试

七、实验拓展


45c2008862974b75a7ea5cf76b5827db.png







相关文章
|
存储 缓存 网络协议
深入理解Linux网络——内核是如何接收到网络包的
一、相关实际问题 RingBuffer是什么,为什么会丢包 网络相关的硬中断、软中断是什么 Linux里的ksoftirqd内核线程是干什么
|
11月前
|
人工智能 Java
产品经理-面试问题(高频率)
本文全面介绍初入产品岗位的基本面试问题,涵盖离职原因、技术沟通、薪资期望、到岗时间、个人优劣势及竞品调研分析等内容。针对每个问题提供详细回答示例,帮助求职者更好地准备面试,提升应答技巧和自信心。内容涉及职业成长、公司文化匹配、工作与生活平衡等多方面考量,助力求职者找到理想职位。
|
安全 数据建模 网络安全
2024阿里云双11,WoSign SSL证书优惠券使用攻略
2024阿里云“11.11金秋云创季”活动主会场,阿里云用户通过完成个人或企业实名认证,可以领取不同额度的满减优惠券,叠加折扣优惠。用户购买WoSign SSL证书,如何叠加才能更加优惠呢?
1250 3
|
7月前
|
Java Shell C#
子线程如何等待主线程执行完再执行
在业务不断迭代中,复杂逻辑难免出现。例如在大业务方法中需临时执行不影响主流程的特殊逻辑,可用子线程处理。若子线程依赖主线程结果(如数据库事务),需确保主线程先完成。文章提出两种解决方式:一是利用Spring事务传播机制拆分方法;二是通过判断父线程状态实现延迟执行,提供`ThreadEndExec`工具类,保守等待15秒后执行子线程任务,保证逻辑顺序与数据一致性。
121 1
|
9月前
|
人工智能 算法 数据安全/隐私保护
[oeasy]python080如何生成验证码_随机数字密码_真随机
本文介绍了如何生成随机验证码的过程,从简单的随机数字生成到包含数字、大小写字母及符号的复杂验证码。通过 Python 的 `random` 和 `string` 模块,逐步扩展字符集并确保结果满足特定要求(如包含各类字符)。同时探讨了随机数生成的本质,指出计算机中的“随机”实际上是基于算法和种子值的伪随机,并非真正的物理随机。最后总结了验证码的生成原理及其在实际应用中的意义,为读者揭开日常生活中验证码背后的编程逻辑。
353 8
|
机器学习/深度学习 算法 数据挖掘
2023 年第二届钉钉杯大学生大数据挑战赛初赛 初赛 A:智能手机用户监测数据分析 问题二分类与回归问题Python代码分析
本文介绍了2023年第二届钉钉杯大学生大数据挑战赛初赛A题的Python代码分析,涉及智能手机用户监测数据分析中的聚类分析和APP使用情况的分类与回归问题。
349 0
2023 年第二届钉钉杯大学生大数据挑战赛初赛 初赛 A:智能手机用户监测数据分析 问题二分类与回归问题Python代码分析
|
11月前
|
编解码 安全 网络安全
指纹浏览器中HTTP代理IP的重要性及使用原因
随着数字化发展,网络安全和隐私保护成为核心需求,指纹浏览器应运而生。它通过客户端信息唯一标识用户身份。搭配HTTP代理IP可增强安全性,具体表现为:1. 保护用户地址;2. 提高信息安全;3. 防止指纹检测;4. 增强网络安全。HTTP代理以其稳定性、安全性,在网络营销等领域发挥重要作用,二者结合为用户提供更强的隐私和安全保护。
366 34
|
数据可视化 关系型数据库 MySQL
|
12月前
|
供应链 数据可视化 搜索推荐
电商管理:从传统到数字化的转变
在数字化时代,电商管理涵盖供应链、客户关系、数据管理和营销推广等多个关键领域。本文探讨了这些领域的策略及挑战,并介绍了板栗看板这一创新工具如何助力电商企业实现高效运营和可持续发展。
309 19