点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
Spark Streaming DStream 转换函数
DStream 无状态转换
DStream 无状态转换 案例
转换方式
有两个类型:
无状态转换(已经完成)
有状态转换
接下来开始有状态转换。
有状态转换
有状态转换主要有两种:
窗口操作
状态跟踪操作
窗口操作
Window Operations 可以设置窗口大小和滑动窗口间隔来动态获取当前Streaming的状态
基于窗口的操作会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
基于窗口的操作需要两个参数:
窗口长度(Window Duration):控制每次计算最近的多少个批次的数据
滑动间隔(Slide Duration):用来控制对新的 DStream 进行计算的间隔
两者都必须是StreamingContext中批次间隔(batchDuration)的整数倍
准备编码
我们先编写一个每秒发送一个数字:
package icu.wzk import java.io.PrintWriter import java.net.{ServerSocket, Socket} object SocketWithWindow { def main(args: Array[String]): Unit = { val port = 9999 val ss = new ServerSocket(port) val socket: Socket = ss.accept() var i = 0 while (true) { i += 1 val out = new PrintWriter(socket.getOutputStream) out.println(i) out.flush() Thread.sleep(1000) } } }
[窗口操作] 案例2观察窗口数据
- 观察窗口的数据
- 观察 batchDuration、windowDuration、slideDuration 三者之间的关系
- 使用窗口相关的操作
编写代码
package icu.wzk import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object WindowDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("WindowDemo") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) lines.foreachRDD { (rdd, time) => { println(s"rdd = ${rdd.id}; time = $time") } rdd.foreach(value => println(value)) } // 20秒窗口长度(DS包含窗口长度范围内的数据) // 10秒滑动间隔(多次时间处理一次数据) val res1: DStream[String] = lines .reduceByWindow(_ + " " + _, Seconds(20), Seconds(10)) res1.print() val res2: DStream[String] = lines .reduceByWindow(_ + _, Seconds(20), Seconds(10)) res2.print() // 求窗口元素的和 val res3: DStream[Int] = lines .map(_.toInt) .reduceByWindow(_ + _, Seconds(20), Seconds(10)) res3.print() // 请窗口元素和 val res4 = res2.map(_.toInt).reduce(_ + _) res4.print() // 程序启动 ssc.start() ssc.awaitTermination() } }
运行结果
------------------------------------------- Time: 1721628860000 ms ------------------------------------------- rdd = 39; time = 1721628865000 ms rdd = 40; time = 1721628870000 ms ------------------------------------------- Time: 1721628870000 ms ------------------------------------------- ------------------------------------------- Time: 1721628870000 ms ------------------------------------------- ------------------------------------------- Time: 1721628870000 ms -------------------------------------------
运行之后控制截图如下: