DStreams的操作

简介: DStreams的操作

一、实验目的

掌握 DStream各种常用转换操作。

  掌握Spark Streaming join操作。

  掌握DStream计算结果的保存。

二、实验内容

编写Spark Streaming流计算程序,完成以下要求:

  1、对DStream进行各种转换操作。

  2、对两个DStream进行join操作。

  3、保存DStream计算结果。

三、实验原理

Spark Streaming提供了多种转换函数,用来对接收到的实时数据进行转换操作。常用的DStreams转换函数如下表所示:

函数名 作用
map(func): 对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
count(): 统计源DStream中每个RDD的元素数量;
reduce(func): 利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
countByValue(): 应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
reduceByKey(func, [numTasks]): 当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
join(otherStream, [numTasks]): 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
cogroup(otherStream, [numTasks]): 当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
transform(func): 通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

5.1 启动Spark集群和HDFS集群

1、在终端窗口下,输入如下命令,分别启动Spark集群和HDFS集群:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

然后使用jps命令查看,确保已经正确启动了Spark和HDFS集群。

  2、启动spark-shell。在终端窗口中,键入以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)

1.  $ spark-shell --master spark://localhost:7077

5.2 启动netcat服务器

在本实验中,我们使用Socket数据源来测试流程序中的各种DStream转换操作。所以,需要启动一个netcat服务器,作为本实验中流程序的Socket数据源。

  另打开一个新的终端,在终端窗口中使用如下命令开启netcat服务器,服务端口为9999:

1.  $ nc -lp 9999

启动以后,就等待用户输入数据。输入的数据会被Spark流程序实时读取并处理。目前暂时不需要输入任何内容。

5.3 DStreams的转换操作

要执行Spark流程序,首先需要创建StreamingContext对象。请切换回pyspark shell窗口执行以下操作。

1、map(func)

使用map()对数据进行处理,这里以把分割后的数据转化为List为例。切换回spark-shell窗口,在paster模式下,输入以下代码:

1.  import org.apache.spark.streaming.Seconds
2.  import org.apache.spark.streaming.StreamingContext
3.       
4.  // 创建StreamingContext Secondes 设置每5s对数据进行一次采集
5.  val ssc = new StreamingContext(sc,Seconds(5))
6.       
7.  // 连接TCP服务 localhost 9999
8.  val lines = ssc.socketTextStream("localhost",9999)
9.  
10. // 对每行数据进行分割(按空格分割)
11. // 分割后的数据会存在Array中 直接打印会数据一个Array对象,所有转为List输出
12. val words = lines.map(_.split(" ").toList)
13.      
14. // 打印输出
15. words.print()
16. 
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  切换到启动TCP服务的终端窗口,输入如下内容:

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546508005000 ms
—————————————————————-
List(spark, java, python, java, python, scala)

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999
2、flatMap(func)

使用flatMap函数对数据进行分割后打印。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 打印输出
9.  words.print()
10.      
11. // 开始任务
12. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546573705000 ms
—————————————————————-
spark
java
python
java
python
scala

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

3、filter(func):

  通过filter函数过滤数据值为”java”的数据打印出来。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.  val f = (name:String) => name == "java"
8.       
9.  val word = words.filter(f)
10.      
11. // 打印输出
12. word.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546573830000 ms
—————————————————————-
java
java

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

4、 union(otherStream)

   union函数是对DStream进行合并,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.  val union_DStream = words.union(words)
8.  union_DStream.print()
9.       
10. // 开始任务
11. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546586695000 ms·
—————————————————————-
spark
java
python
java
python
scala
spark
java
python
java

上面案例是把输入的数据进行一次合并。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

5、count()

   count函数是对DStream元素进行统计,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对words中的元素数据进行统计
9.  val word_count = words.count()
10.      
11. // 打印统计结果
12. word_count.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546587435000 ms
—————————————————————-
0
—————————————————————-
Time: 1546587440000 ms
—————————————————————-
6

可以看出,当没有数据时,统计数量为0。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

6、reduce(func)

   reduce函数是对DStream数据进行聚合,返回一个新的DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对DStream中的元素依次相加 
9.  val word = words.reduce((a, b) => a + b)
10.      
11. // 打印统计结果
12. word.print()
13.      
14. // 开始任务
15. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  java

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546589670000 ms
—————————————————————-
java

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

7、countByValue()

   countByValue函数是对DStream数据进行统计,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 对DStream中的元素进行计数并一(K,V)键值对返回
9.  val word = words.countByValue()
10. // 打印统计结果
11. word.print()
12.      
13. // 开始任务
14. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546590505000 ms
—————————————————————-
(scala,1)
(python,2)
(java,2)
(spark,1)

上面的是打印结果中 K 为元素值,V为元素出现的次数。

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

8、reduceByKey(func)

   reduceByKey函数是对DStream中的数据根据相同的K对V进行处理,返回一个(K,V)键值对类型的新DStream。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 使用map对元组进行组合 组合为(K,V)的形式
9.  val word = words.map((_, 1))
10.      
11. // 通过reduceByKey对相同K的元组进行V相加
12. val word_reduce = word.reduceByKey((a, b) => a + b)
13.      
14. // 打印统计结果
15. word_reduce.print()
16.      
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1546593640000 ms
—————————————————————-
(scala,1)
(python,2)
(java,2)
(spark,1)

reduceByKey函数多用于处理(K,V)数据的聚合

  停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

然后发现netcat服务关闭,需重启netcat服务

1.  nc -lp 9999

9、join(otherStream):

  join函数可以将两个DStream连接到一起。例如,有一个(K,V),以及另一个(K,W),将这两者进行join连接,返回的结果为(K,(V,W))。在spark-shell窗口下,继续键入以下代码:

1.  val ssc = new StreamingContext(sc,Seconds(5))
2.  val lines = ssc.socketTextStream("localhost",9999)
3.       
4.  // 对每行数据进行分割(按空格分割)
5.  // 分割后的数据会存在map中
6.  val words = lines.flatMap(_.split(" "))
7.       
8.  // 使用map对元组进行组合 组合为(K,V)的形式
9.  val word = words.map((_, 1))
10.      
11. // 通过join对把两个DSstream加到一起
12. val word_join = word.join(word)
13.      
14. // 打印统计结果
15. word_join.print()
16.      
17. // 开始任务
18. ssc.start()

然后同时按下【Ctrl + D】,执行以上代码。

  紧接着,切换到启动TCP服务的终端窗口,输入如下内容::

1.  spark java python java python scala

切换回spark-shell窗口,查看程序输出窗口,打印结果如下:

—————————————————————-
Time: 1547189050000 ms
—————————————————————-
(scala,(1,1))
(python,(1,1))
(python,(1,1))
(python,(1,1))
(python,(1,1))
(java,(1,1))
(java,(1,1))
(java,(1,1))
(java,(1,1))
(spark,(1,1))

停止流计算,但不终止ssc。在spark-shell窗口下,键入以下命令:

1.  ssc.stop(false)

六、 实验知识测试

七、实验拓展


45c2008862974b75a7ea5cf76b5827db.png







相关文章
|
存储 缓存 网络协议
深入理解Linux网络——内核是如何接收到网络包的
一、相关实际问题 RingBuffer是什么,为什么会丢包 网络相关的硬中断、软中断是什么 Linux里的ksoftirqd内核线程是干什么
|
12月前
|
人工智能 算法 数据安全/隐私保护
[oeasy]python080如何生成验证码_随机数字密码_真随机
本文介绍了如何生成随机验证码的过程,从简单的随机数字生成到包含数字、大小写字母及符号的复杂验证码。通过 Python 的 `random` 和 `string` 模块,逐步扩展字符集并确保结果满足特定要求(如包含各类字符)。同时探讨了随机数生成的本质,指出计算机中的“随机”实际上是基于算法和种子值的伪随机,并非真正的物理随机。最后总结了验证码的生成原理及其在实际应用中的意义,为读者揭开日常生活中验证码背后的编程逻辑。
466 8
|
前端开发 JavaScript 小程序
印象最深的一个bug——使用uinapp做混合开发静态图片在安卓端不显示
这几天一直在做混合开发,使用的是uni-app开发的,一套代码,多端使用,适用于各个平台。听起来很完美,使用过程不可多说,不知道是因为我们的需求变态还是我们团队两端技术水平太差。总之,开发联调过程十分痛苦,加上uniapp的调试十分困难,一度让我们两端互怼。这其中我印象最深的一个bug就是在对接联调总出现的
1665541 33
印象最深的一个bug——使用uinapp做混合开发静态图片在安卓端不显示
|
消息中间件 NoSQL Java
RabbitMQ死信队列实战——解决订单超时未支付
RabbitMQ死信队列实战——解决订单超时未支付
1196 0
RabbitMQ死信队列实战——解决订单超时未支付
|
机器学习/深度学习 存储 人工智能
云栖大会推出阿里灵杰,大数据+AI一体化平台 6 大重磅发布
10月20日2021杭州云栖大会上,阿里巴巴集团副总裁、阿里云计算平台事业部负责人贾扬清发布大数据+AI一体化平台新品牌“阿里灵杰”,提供从“生产-采集-存储-分析-开发-治理-价值体现”整套云原生技术架构和产品体系,配套智能化运维平台和强大的数据资产安全管控能力。
2039 0
云栖大会推出阿里灵杰,大数据+AI一体化平台 6 大重磅发布
|
安全 Linux 网络安全
SIP不能注册或呼叫到服务器端怎样处理
SIP不能注册或呼叫到服务器端怎样处理
|
Go 数据安全/隐私保护 C++
【Windows系统】基于vscode搭建go语言开发环境
【Windows系统】基于vscode搭建go语言开发环境
345 0
【Windows系统】基于vscode搭建go语言开发环境
|
存储 达摩院 前端开发
ICASSP 2022 多通道多方会议转录挑战项目(M2MeT)成功举办
近日,ICASSP 2022 多通道多方会议转录挑战(M2MeT)完成了测试集评测及结果公布。本次挑战由阿里巴巴达摩院语音实验室和希尔贝壳联合举办,多位国际知名行业专家包括达摩院语音实验室负责人鄢志杰、研究员马斌,希尔贝壳CEO卜辉,希尔贝壳基金会谢磊教授,美国俄亥俄州立大学汪德亮教授,丹麦奥尔堡大学谭政华教授,上海交通大学钱彦旻教授,新加坡A*STAR资讯通信研究院高级科学家Kong Aik Lee等参与大会组织和评审。
664 0
ICASSP 2022 多通道多方会议转录挑战项目(M2MeT)成功举办
|
存储 Oracle 关系型数据库
分表分库(百亿级大数据存储)
100亿数据其实并不多,一个比较常见的数据分表分库模型: MySql数据库8主8从,每服务器8个库,每个库16张表,共1024张表(从库也有1024张表) ,每张表1000万到5000万数据,整好100亿到500亿数据!
1550 0
|
程序员 API 计算机视觉
皮克斯动画特效太好,但特效师容易「手抖」,还好Python来帮忙
当谈论特效电影时,Python常常被我们所忽略,特效工作者小哥Dhruv Govil要为Python「平反」,他写了篇博客讲述了在动画电影制作的每一个环节,Python都能为特效师们提供极大的便利!
549 0
皮克斯动画特效太好,但特效师容易「手抖」,还好Python来帮忙

热门文章

最新文章