Spark Streaming保存计算状态

简介: Spark Streaming保存计算状态

一、实验目的

掌握 DStream数据累加函数updateStateByKey。

  掌握 DStream数据累加函数mapWithState。

二、实验内容

1、每5秒钟计算一次每个单词出现的累加数量。(使用Socket数据源)

三、实验原理

在DStream中支持跨批次数据执行计算时保持任意状态。在Spark第一代流处理(Spark Streaming)中,这需要手工实现。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

5.1 启动Spark集群

1、在终端窗口下,输入如下命令,启动Spark集群:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

2、启动spark-shell。在终端窗口下,输入如下命令(注意:以下命令中的localhost,请换成你自己虚拟机实际的机器名):

1.  $ spark-shell --master spark://localhost:7077

3、另打开一个终端窗口,键入以下命令,启动Socket服务器:

1.  $ nc -lp 9999

5.2 使用updateStateByKey函数,每5秒钟计算一次每个单词出现的累加数量

1、updateStateByKey 解释:

  以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加。在有新的数据信息进入或更新时。能够让用户保持想要的不论什么状。使用这个函数需要两步:

  1) 定义状态:可以是随意数据类型;

  2) 定义状态更新函数:用一个函数指定怎样使用先前的状态。从输入流中的新值更新状态。

  下面的实验代码使用updateStateByKey(newUpdateFunc)函数计算每5秒钟单词出现的累加数量。

  请在spark-shell的paste模式下,输入以下代码:

1.  import org.apache.spark.SparkContext
2.  import org.apache.spark.sql.catalyst.expressions.Second
3.  import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
4.  import org.apache.spark.streaming.dstream.DStream
5.  import org.apache.spark.streaming.{Seconds, StreamingContext}
6.       
7.  val ssc = new StreamingContext(sc, Seconds(5))
8.       
9.  // 设置检查点缓存目录
10. ssc.checkpoint("hdfs://localhost:9000/checkpoints")
11.      
12. // 监听nc服务端口
13. val lines = ssc.socketTextStream("localhost", 9999)
14.      
15. // 把每行数据安“空格”分隔
16. val words = lines.flatMap(_.split(" "))
17.      
18. // 把单和1组合
19. val word_count = words.map((_, 1))
20.      
21. // 定义一个数据处理函数
22. val updateFunc = (values: Seq[Int], state: Option[Int]) => {
23.  // values 是所有相同key的value集 
24.  // state 是旧数据状态值 当第一没有值是返回0
25.   val count = values.sum + state.getOrElse(0)
26.   Some(count)
27. }
28.      
29. // updateStateByKey对数据进行 累加
30. val words_count = word_count.updateStateByKey(updateFunc)
31.      
32. // 打印数据
33. words_count.print()
34. 
35. ssc.start()
36. ssc.awaitTermination()

2、切换到另一个终端窗口(Socket服务器所运行的窗口),随意输入一些语句,语句间的单词以空格分隔。如下所示:

1.  java python
2.  ...
3.  java python scala

请确保输入的语句跨5秒钟。

  3、切换到spark-shell所在终端窗口,查看打印出的结果,结果如下所示:

—————————————————————-
Time: 1548654660000 ms
—————————————————————-
—————————————————————-
Time: 1548654665000 ms
—————————————————————-
(python,1)
(java,1)
—————————————————————-
Time: 1548654670000 ms
—————————————————————-
(scala,1)
(python,2)
(java,2)

由上面的内容可以看出,第一次和第二次的数据进行了累加统计。

  4、停止Spark流计算。在spark-shell窗口中,执行以下命令,停止流计算的执行:

1.  ssc.stop(false)

或者,同时按下【Ctrl + C】键来终止。

5.3 使用mapWithState函数,每5秒钟计算一次每个单词出现的累加数量

1、mapWithStat 和 updateStateByKey一样都是状态管理,返回一个MapWithStateDStream。它使用一个函数不断作用于这个DStream中key- value的元素,基于key进行状态维护和更新。mapWithState这个方法参数是StateSpec,这里边封装了对数据操作的函数。

  下面使用mapWithState函数,同样每5秒钟计算一次每个单词出现的累加数量。

  在spark-shell窗口中,在paste式下,输入以下代码:

1.  import org.apache.spark.streaming.State
2.  import org.apache.spark.streaming.StateSpec
3.       
4.  // 定义一个函数
5.  val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {
6.      // count 获取数据的value值, 与state的状态值相加
7.      val sum = count.getOrElse(0) + state.getOption.getOrElse(0)
8.           
9.      // 输出累加后的统计结果
10.     val output = (word, sum)
11.          
12.     // 更新状态值
13.     state.update(sum)
14.          
15.     output
16. }
17.      
18. // 定义一个 StateSpec 传入 mappingFunc函数
19. val spc = StateSpec.function(mappingFunc)
20.      
21. // 代用mapWithState 传入 spc
22. val value = word_count.mapWithState(spc)
23.      
24. // 打印输出
25. value.print()
26.      
27. ssc.start()
28. ssc.awaitTermination()

2、切换到另一个终端窗口(Socket服务器所运行的窗口),随意输入一些语句,语句间的单词以空格分隔。如下所示:

1.  java python
2.  ...
3.  python scala hadoop

请确保输入的语句跨5秒钟。

  3、切换到spark-shell所在终端窗口,查看打印出的结果,结果如下所示:

—————————————————————-
Time: 1548752815000 ms
—————————————————————-
(java,1)
(python,1)
—————————————————————-
Time: 1548752820000 ms
—————————————————————-
—————————————————————-
Time: 1548752825000 ms
—————————————————————-
(python,2)
(scala,1)
(hadoop,1)

由上面的内容可以看出,第一次和第二次的数据进行了累加统计。

  4、停止Spark流计算。在spark-shell窗口中,执行以下命令,停止流计算的执行:

1.  ssc.stop(false)

或者,同时按下【Ctrl + C】键来终止。

相关文章
|
6天前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
40 0
|
3月前
|
分布式计算 监控 数据处理
Spark Streaming的容错性与高可用性
Spark Streaming的容错性与高可用性
|
3月前
|
分布式计算 数据处理 Apache
Spark Streaming与数据源连接:Kinesis、Flume等
Spark Streaming与数据源连接:Kinesis、Flume等
|
3月前
|
消息中间件 分布式计算 Kafka
使用Kafka与Spark Streaming进行流数据集成
使用Kafka与Spark Streaming进行流数据集成
|
3月前
|
分布式计算 监控 数据处理
Spark Streaming的DStream与窗口操作
Spark Streaming的DStream与窗口操作
|
3月前
|
分布式计算 监控 数据处理
实时数据处理概述与Spark Streaming简介
实时数据处理概述与Spark Streaming简介
|
3月前
|
分布式计算 数据处理 Apache
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
|
3月前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
3月前
|
消息中间件 分布式计算 Kafka
Spark中的Spark Streaming是什么?请解释其作用和用途。
Spark中的Spark Streaming是什么?请解释其作用和用途。
27 0