一、实验目的
掌握 DStream数据累加函数updateStateByKey。
掌握 DStream数据累加函数mapWithState。
二、实验内容
1、每5秒钟计算一次每个单词出现的累加数量。(使用Socket数据源)
三、实验原理
在DStream中支持跨批次数据执行计算时保持任意状态。在Spark第一代流处理(Spark Streaming)中,这需要手工实现。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3
五、实验步骤
5.1 启动Spark集群
1、在终端窗口下,输入如下命令,启动Spark集群:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh
2、启动spark-shell。在终端窗口下,输入如下命令(注意:以下命令中的localhost,请换成你自己虚拟机实际的机器名):
1. $ spark-shell --master spark://localhost:7077
3、另打开一个终端窗口,键入以下命令,启动Socket服务器:
1. $ nc -lp 9999
5.2 使用updateStateByKey函数,每5秒钟计算一次每个单词出现的累加数量
1、updateStateByKey 解释:
以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加。在有新的数据信息进入或更新时。能够让用户保持想要的不论什么状。使用这个函数需要两步:
1) 定义状态:可以是随意数据类型;
2) 定义状态更新函数:用一个函数指定怎样使用先前的状态。从输入流中的新值更新状态。
下面的实验代码使用updateStateByKey(newUpdateFunc)函数计算每5秒钟单词出现的累加数量。
请在spark-shell的paste模式下,输入以下代码:
1. import org.apache.spark.SparkContext 2. import org.apache.spark.sql.catalyst.expressions.Second 3. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} 4. import org.apache.spark.streaming.dstream.DStream 5. import org.apache.spark.streaming.{Seconds, StreamingContext} 6. 7. val ssc = new StreamingContext(sc, Seconds(5)) 8. 9. // 设置检查点缓存目录 10. ssc.checkpoint("hdfs://localhost:9000/checkpoints") 11. 12. // 监听nc服务端口 13. val lines = ssc.socketTextStream("localhost", 9999) 14. 15. // 把每行数据安“空格”分隔 16. val words = lines.flatMap(_.split(" ")) 17. 18. // 把单和1组合 19. val word_count = words.map((_, 1)) 20. 21. // 定义一个数据处理函数 22. val updateFunc = (values: Seq[Int], state: Option[Int]) => { 23. // values 是所有相同key的value集 24. // state 是旧数据状态值 当第一没有值是返回0 25. val count = values.sum + state.getOrElse(0) 26. Some(count) 27. } 28. 29. // updateStateByKey对数据进行 累加 30. val words_count = word_count.updateStateByKey(updateFunc) 31. 32. // 打印数据 33. words_count.print() 34. 35. ssc.start() 36. ssc.awaitTermination()
2、切换到另一个终端窗口(Socket服务器所运行的窗口),随意输入一些语句,语句间的单词以空格分隔。如下所示:
1. java python 2. ... 3. java python scala
请确保输入的语句跨5秒钟。
3、切换到spark-shell所在终端窗口,查看打印出的结果,结果如下所示:
—————————————————————- Time: 1548654660000 ms —————————————————————- —————————————————————- Time: 1548654665000 ms —————————————————————- (python,1) (java,1) —————————————————————- Time: 1548654670000 ms —————————————————————- (scala,1) (python,2) (java,2)
由上面的内容可以看出,第一次和第二次的数据进行了累加统计。
4、停止Spark流计算。在spark-shell窗口中,执行以下命令,停止流计算的执行:
1. ssc.stop(false)
或者,同时按下【Ctrl + C】键来终止。
5.3 使用mapWithState函数,每5秒钟计算一次每个单词出现的累加数量
1、mapWithStat 和 updateStateByKey一样都是状态管理,返回一个MapWithStateDStream。它使用一个函数不断作用于这个DStream中key- value的元素,基于key进行状态维护和更新。mapWithState这个方法参数是StateSpec,这里边封装了对数据操作的函数。
下面使用mapWithState函数,同样每5秒钟计算一次每个单词出现的累加数量。
在spark-shell窗口中,在paste式下,输入以下代码:
1. import org.apache.spark.streaming.State 2. import org.apache.spark.streaming.StateSpec 3. 4. // 定义一个函数 5. val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => { 6. // count 获取数据的value值, 与state的状态值相加 7. val sum = count.getOrElse(0) + state.getOption.getOrElse(0) 8. 9. // 输出累加后的统计结果 10. val output = (word, sum) 11. 12. // 更新状态值 13. state.update(sum) 14. 15. output 16. } 17. 18. // 定义一个 StateSpec 传入 mappingFunc函数 19. val spc = StateSpec.function(mappingFunc) 20. 21. // 代用mapWithState 传入 spc 22. val value = word_count.mapWithState(spc) 23. 24. // 打印输出 25. value.print() 26. 27. ssc.start() 28. ssc.awaitTermination()
2、切换到另一个终端窗口(Socket服务器所运行的窗口),随意输入一些语句,语句间的单词以空格分隔。如下所示:
1. java python 2. ... 3. python scala hadoop
请确保输入的语句跨5秒钟。
3、切换到spark-shell所在终端窗口,查看打印出的结果,结果如下所示:
—————————————————————- Time: 1548752815000 ms —————————————————————- (java,1) (python,1) —————————————————————- Time: 1548752820000 ms —————————————————————- —————————————————————- Time: 1548752825000 ms —————————————————————- (python,2) (scala,1) (hadoop,1)
由上面的内容可以看出,第一次和第二次的数据进行了累加统计。
4、停止Spark流计算。在spark-shell窗口中,执行以下命令,停止流计算的执行:
1. ssc.stop(false)
或者,同时按下【Ctrl + C】键来终止。