接上篇:https://developer.aliyun.com/article/1622641?spm=a2c6h.13148508.setting.19.27ab4f0ehhuqRu
[窗口操作] 案例3 热点搜索词实时统计
编写代码
package icu.wzk import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object HotWordStats { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("HotWordStats") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(2)) // 检查点设置 也可以设置到 HDFS ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint("checkpoint") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split("\\s+")) val pairs: DStream[(String, Int)] = words.map(x => (x, 1)) // 通过 reduceByKeyAndWindow算子 每隔10秒统计最近20秒的词出现的的次数 val wordCounts1: DStream[(String, Int)] = pairs .reduceByKeyAndWindow( (a: Int, b: Int) => a + b, Seconds(20), Seconds(10), 2 ) wordCounts1.print() // 需要CheckPoint的支持 val wordCounts2: DStream[(String, Int)] = pairs .reduceByKeyAndWindow( _ + _, _ - _, Seconds(20), Seconds(10), 2 ) wordCounts2.print() // 运行程序 ssc.start() ssc.awaitTermination() } }
运行结果
------------------------------------------- Time: 1721629842000 ms ------------------------------------------- (4,1) (8,1) (6,1) (2,1) (7,1) (5,1) (3,1) (1,1) ------------------------------------------- Time: 1721629842000 ms --------------------
运行结果如下图:
[状态追踪操作] updateStateByKey
UpdateStateByKey的主要功能:
为Streaming中每一个Key维护一份State状态,state类型可以是任意类型的,可以是自定义对象,更新函数也可以是自定义的
通过更新函数对该Key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候已经存在的key进行state状态更新
使用updateStateByKey时要开启 CheckPoint 功能
编写代码1
流式程序启动后计算wordcount的累计值,将每个批次的结果保存到文件
package icu.wzk import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object StateTracker1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("StateTracker1") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint("checkpoint") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split("\\s+")) val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1)) // 定义状态更新函数 // 函数常量定义 返回类型是 Some(Int),表示的含义是最新状态 // 函数的功能是将当前时间间隔内产生的Key的Value集合,加到上一个状态中,得到最新状态 val updateFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => { // 通过Spark内部的reduceByKey按Key规约,然后这里传入某Key当前批次的Seq,再计算当前批次的总和 val currentCount = currValues.sum // 已累加的值 val previousCount = prevValueState.getOrElse(0) Some(currentCount + previousCount) } val stateDStream: DStream[(String, Int)] = wordDStream.updateStateByKey[Int](updateFunc) stateDStream.print() // 把DStream保存到文本文件中 会生成很多的小文件 一个批次生成一个目录 val outputDir = "output1" stateDStream .repartition(1) .saveAsTextFiles(outputDir) // 开始运行 ssc.start() ssc.awaitTermination() } }
运行结果1
------------------------------------------- Time: 1721631080000 ms ------------------------------------------- (1,1) (2,1) (3,1) ------------------------------------------- Time: 1721631085000 ms ------------------------------------------- (8,1) (1,1) (2,1) (3,1) (4,1) (5,1) (6,1) (7,1)
运行结果是:
统计全局的Key的状态,但是就算没有数据输入,也会在每一个批次的时候返回之前的Key的状态。
这样的缺点:
如果数据量很大的话,CheckPoint数据会占用较大存储,而且效率也不高
编写代码2
mapWithState:也是用于全局统计Key的状态,如果没有数据输入,便不会返回之前的Key的状态,有一点增量的感觉。
这样做的好处是,只关心那些已经发生的变化的Key,对于没有数据输入,则不会返回那些没有变化的Key的数据,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。
package icu.wzk import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} object StateTracker2 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("StateTracker2") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(2)) ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint("checkpoint") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split("\\s+")) val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1)) def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = { val sum: Int = one.getOrElse(0) + state.getOption.getOrElse(0) state.update(sum) (key, sum) } val spec = StateSpec.function(mappingFunction _) val resultDStream: DStream[(String, Int)] = wordDStream.mapWithState(spec) resultDStream.cache() // 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录 val outputDir = "output2" resultDStream.repartition(1).saveAsTextFiles(outputDir) ssc.start() ssc.awaitTermination() } }
运行代码