Spark Streaming之window滑动窗口详解

简介: 笔记

window滑动窗口

Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行 计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作 为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3 秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口 计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数 值都必须是batch间隔的整数倍。(Spark Streaming对滑动窗口的支持,是比Storm更加完善和强 大的)8.png

window滑动窗口操作函数

9.png

案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出 排名最靠前的3个搜索词以及出现次数


执行reduceByKeyAndWindow,滑动窗口操作


第二个参数,是窗口长度,这里是60秒

第三个参数,是滑动间隔,这里是10秒


也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对 一个RDD进行后续计算


Java语言实现:

package com.kfk.spark.window_hotwords_project;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/18
 * @time : 2:03 下午
 */
public class WindowHotWordJava {
    static JavaStreamingContext jssc = null;
    public static void main(String[] args) throws InterruptedException {
        jssc = CommStreamingContext.getJssc();
        /**
         * 数据模型:java
         *         hive
         *         spark
         *         java
         */
        JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999);
        /**
         * <java,1>
         * <hive,1>
         * ...
         */
        JavaPairDStream<String, Integer> pair = inputDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String line) throws Exception {
                return new Tuple2<>(line,1);
            }
        });
        /**
         * <java,5>
         * <hive,3>
         * <spark,6>
         * <flink,10>
         */
        JavaPairDStream<String, Integer> windowWordCount = pair.reduceByKeyAndWindow(new Function2<Integer,Integer,Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        }, Durations.seconds(60),Durations.seconds(10));
        JavaDStream<Tuple2<String, Integer>> finalStream = windowWordCount.transform(
                new Function<JavaPairRDD<String, Integer>, JavaRDD<Tuple2<String, Integer>>>() {
            @Override
            public JavaRDD<Tuple2<String, Integer>> call(JavaPairRDD<String, Integer> line) throws Exception {
                JavaPairRDD<Integer,String> beginPair = line.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                    @Override
                    public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1);
                    }
                });
                JavaPairRDD<Integer,String> sortRdd = beginPair.sortByKey(false);
                JavaPairRDD<String,Integer> sortPair = sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                    @Override
                    public Tuple2<String,Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                        return new Tuple2<>(integerStringTuple2._2,integerStringTuple2._1);
                    }
                });
                List<Tuple2<String,Integer>> wordList = sortPair.take(3);
                for (Tuple2<String, Integer> stringIntegerTuple2 : wordList) {
                    System.out.println(stringIntegerTuple2._1 + " : " + stringIntegerTuple2._2);
                }
                return jssc.sparkContext().parallelize(wordList);
            }
        });
        finalStream.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

Scala语言实现

package com.kfk.spark.window_hotwords_project
import com.kfk.spark.common.CommStreamingContextScala
import org.apache.spark.streaming.Seconds
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/18
 * @time : 4:47 下午
 */
object WindowHotWordScala {
    def main(args: Array[String]): Unit = {
        val jssc = CommStreamingContextScala.getJssc
        val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999)
        /**
         * 数据模型:java
         * hive
         * spark
         * java
         */
        val pairDStream = inputDstream.map(x => (x,1))
        /**
         * <java,1>
         * <hive,1>
         * ...
         */
        val windowWordCount = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10))
        /**
         * 热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,
         * 并打印出 排名最靠前的3个搜索词以及出现次数
         */
        val finalDStream = windowWordCount.transform(x => {
            val sortRDD = x.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
            val list = sortRDD.take(3)
            jssc.sparkContext.parallelize(list)
        })
        finalDStream.print()
        jssc.start()
        jssc.awaitTermination()
    }
}

测试数据:

java 
java 
hive
hive
java
hava
java
hive

运行结果:

-------------------------------------------
Time: 1608705518000 ms
-------------------------------------------
(hive,3)
(java ,2)
(java,2)
相关文章
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
88 0
|
12天前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
56 0
|
1月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
29 0
|
1月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
45 0
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
32 0
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
29 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
31 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
40 0
下一篇
无影云桌面