- sliding-time-window (有重叠数据)
//1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流来源 val text = env.socketTextStream("localhost", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { line => { val tokens = line.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒 //也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(10), Time.seconds(5)) .sum("carCnt") //5.显示统计结果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName)
CountWindow
CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。
注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。
- tumbling-count-window (无重叠数据)
//1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流来源 val text = env.socketTextStream("localhost", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5 //按照key进行收集,对应的key出现的次数达到5次作为一个结果 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5) .sum("carCnt") //5.显示统计结果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName)
- sliding-count-window (有重叠数据)
同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3
//1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流来源 val text = env.socketTextStream("localhost", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据 //也就是说,每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5, 3) .sum("carCnt") //5.显示统计结果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName)
- Window 总结
- flink支持两种划分窗口的方式(time和count)
- 如果根据时间划分窗口,那么它就是一个time-window
- 如果根据数据划分窗口,那么它就是一个count-window
- flink支持窗口的两个重要属性(size和interval)
- 如果size=interval,那么就会形成tumbling-window(无重叠数据)
- 如果size>interval,那么就会形成sliding-window(有重叠数据)
- 如果size<interval,那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。
- 通过组合可以得出四种基本窗口
- time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))
- time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))
- count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)
- count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)
Window Reduce
WindowedStream → DataStream:给window赋一个reduce功能的函数,并返回一个聚合的结果。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object StreamWindowReduce { def main(args: Array[String]): Unit = { // 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建SocketSource val stream = env.socketTextStream("node01", 9999) // 对stream进行处理并按key聚合 val streamKeyBy = stream.map(item => (item, 1)).keyBy(0) // 引入时间窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) ) // 将聚合数据写入文件 streamReduce.print() // 执行程序 env.execute("TumblingWindow") } }
Window Apply
apply方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时使用。
用法
- 实现一个 WindowFunction 类
- 指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy中使用分组字段的类型, 窗口类型]
示例:使用apply方法来实现单词统计
步骤:
- 获取流处理运行环境
- 构建socket流数据源,并指定IP地址和端口号
- 对接收到的数据转换成单词元组
- 使用 keyBy 进行分流(分组)
- 使用 timeWinodw 指定窗口的长度(每3秒计算一次)
- 实现一个WindowFunction匿名内部类
- apply方法中实现聚合计算
- 使用Collector.collect收集数据
核心代码如下:
//1. 获取流处理运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2. 构建socket流数据源,并指定IP地址和端口号 val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" ")) //3. 对接收到的数据转换成单词元组 val wordDataStream = textDataStream.map(_->1) //4. 使用 keyBy 进行分流(分组) val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1) //5. 使用 timeWinodw 指定窗口的长度(每3秒计算一次) val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3)) //6. 实现一个WindowFunction匿名内部类 val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] { //在apply方法中实现数据的聚合 override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { println("hello world") val tuple = input.reduce((t1, t2) => { (t1._1, t1._2 + t2._2) }) //将要返回的数据收集起来,发送回去 out.collect(tuple) } }) reduceDatStream.print() env.execute()