window滑动窗口
Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行 计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作 为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3 秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口 计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数 值都必须是batch间隔的整数倍。(Spark Streaming对滑动窗口的支持,是比Storm更加完善和强 大的)
window滑动窗口操作函数
案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出 排名最靠前的3个搜索词以及出现次数
执行reduceByKeyAndWindow,滑动窗口操作
第二个参数,是窗口长度,这里是60秒
第三个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对 一个RDD进行后续计算
Java语言实现:
package com.kfk.spark.window_hotwords_project; import com.kfk.spark.common.CommStreamingContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/18 * @time : 2:03 下午 */ public class WindowHotWordJava { static JavaStreamingContext jssc = null; public static void main(String[] args) throws InterruptedException { jssc = CommStreamingContext.getJssc(); /** * 数据模型:java * hive * spark * java */ JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999); /** * <java,1> * <hive,1> * ... */ JavaPairDStream<String, Integer> pair = inputDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String line) throws Exception { return new Tuple2<>(line,1); } }); /** * <java,5> * <hive,3> * <spark,6> * <flink,10> */ JavaPairDStream<String, Integer> windowWordCount = pair.reduceByKeyAndWindow(new Function2<Integer,Integer,Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }, Durations.seconds(60),Durations.seconds(10)); JavaDStream<Tuple2<String, Integer>> finalStream = windowWordCount.transform( new Function<JavaPairRDD<String, Integer>, JavaRDD<Tuple2<String, Integer>>>() { @Override public JavaRDD<Tuple2<String, Integer>> call(JavaPairRDD<String, Integer> line) throws Exception { JavaPairRDD<Integer,String> beginPair = line.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1); } }); JavaPairRDD<Integer,String> sortRdd = beginPair.sortByKey(false); JavaPairRDD<String,Integer> sortPair = sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String,Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception { return new Tuple2<>(integerStringTuple2._2,integerStringTuple2._1); } }); List<Tuple2<String,Integer>> wordList = sortPair.take(3); for (Tuple2<String, Integer> stringIntegerTuple2 : wordList) { System.out.println(stringIntegerTuple2._1 + " : " + stringIntegerTuple2._2); } return jssc.sparkContext().parallelize(wordList); } }); finalStream.print(); jssc.start(); jssc.awaitTermination(); } }
Scala语言实现
package com.kfk.spark.window_hotwords_project import com.kfk.spark.common.CommStreamingContextScala import org.apache.spark.streaming.Seconds /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/18 * @time : 4:47 下午 */ object WindowHotWordScala { def main(args: Array[String]): Unit = { val jssc = CommStreamingContextScala.getJssc val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999) /** * 数据模型:java * hive * spark * java */ val pairDStream = inputDstream.map(x => (x,1)) /** * <java,1> * <hive,1> * ... */ val windowWordCount = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10)) /** * 热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次, * 并打印出 排名最靠前的3个搜索词以及出现次数 */ val finalDStream = windowWordCount.transform(x => { val sortRDD = x.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)) val list = sortRDD.take(3) jssc.sparkContext.parallelize(list) }) finalDStream.print() jssc.start() jssc.awaitTermination() } }
测试数据:
java java hive hive java hava java hive
运行结果:
------------------------------------------- Time: 1608705518000 ms ------------------------------------------- (hive,3) (java ,2) (java,2)