(2)sparkstreaming滚动窗口和滑动窗口演示

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 一、滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。

一、滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。
image.png
在sparkstreaming中,滚动窗口需要设置窗口大小和滑动间隔,窗口大小和滑动间隔都是StreamingContext的间隔时间的倍数,同时窗口大小和滑动间隔相等,如:
.window(Seconds(10),Seconds(10)) 10秒的窗口大小和10秒的滑动大小,不存在重叠部分

package com.examples;

import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * Created by lj on 2022-07-12.
 */
public class SparkSql_Socket_Tumble {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //获得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));

        /**
         * 设置日志的级别: 避免日志重复
         */
        ssc.sparkContext().setLogLevel("ERROR");

        //从socket源获取数据
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
            private static final long serialVersionUID = 1L;

            public WaterSensor call(String s) throws Exception {
                String[] cols = s.split(",");
                WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
                return waterSensor;
            }
        }).window(Durations.minutes(3), Durations.minutes(3));      //滚动窗口:需要设置窗口大小和滑动间隔,窗口大小和滑动间隔都是StreamingContext的间隔时间的倍数,同时窗口大小和滑动间隔相等。

        mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
            @Override
            public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
                SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());

                Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
                // 创建临时表
                dataFrame.createOrReplaceTempView("log");
                Dataset<Row> result = spark.sql("select * from log");
                System.out.println("========= " + time + "=========");
                //输出前20条数据
                result.show();
            }
        });


        //开始作业
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

代码中定义了一个3分钟的时间窗口和3分钟的滑动大小,运行结果可以看出数据没有出现重叠,实现了滚动窗口的效果:
image.png

二、滑动窗口(Sliding Windows)与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个滑动步长(window slide),代表窗口计算的频率。
image.png

在sparkstreaming中,滑动窗口需要设置窗口大小和滑动间隔,窗口大小和滑动间隔都是StreamingContext的间隔时间的倍数,同时窗口大小和滑动间隔不相等,如:
.window(Seconds(10),Seconds(5)) 10秒的窗口大小和5秒的活动大小,存在重叠部分

package com.examples;

import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by lj on 2022-07-12.
 */
public class SparkSql_Socket {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //获得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));

        /**
         * 设置日志的级别: 避免日志重复
         */
        ssc.sparkContext().setLogLevel("ERROR");

        //从socket源获取数据
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
            private static final long serialVersionUID = 1L;

            public WaterSensor call(String s) throws Exception {
                String[] cols = s.split(",");
                WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
                return waterSensor;
            }
        }).window(Durations.minutes(4), Durations.minutes(2));      //滑动窗口:指定窗口大小 和 滑动频率 必须是批处理时间的整数倍

        mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
            @Override
            public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
                SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());

                Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
                // 创建临时表
                dataFrame.createOrReplaceTempView("log");
                Dataset<Row> result = spark.sql("select * from log");
                System.out.println("========= " + time + "=========");
                //输出前20条数据
                result.show();
            }
        });


        //开始作业
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

image.png

数据演进过程解释:
image.png

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
安全 Java 编译器
阿里巴巴Dragonwell
阿里巴巴Dragonwell
|
存储 分布式计算 大数据
阿里云 EMR 强势助力,与阿里云大数据体系共创辉煌,把握时代热点,开启生态建设之旅
【8月更文挑战第26天】阿里云EMR(Elastic MapReduce)是一种大数据处理服务,与阿里云的多个服务紧密结合,共同构建了完善的大数据生态系统。EMR与对象存储服务(OSS)集成,利用OSS提供可靠、低成本且可扩展的数据存储;与MaxCompute集成,实现深度数据分析和挖掘;还支持数据湖构建服务,加速数据湖的搭建并简化数据管理与分析过程。EMR提供多种编程接口及工具,如Hive、Spark和Flink等,帮助用户高效完成大数据处理任务。
396 2
|
消息中间件 Prometheus 监控
Producer的监控与日志记录最佳实践
【8月更文第29天】在分布式系统中,消息队列作为关键组件之一,其稳定性和性能至关重要。生产者(Producer)负责生成并发送消息到消息队列中,因此确保生产者的健康运行是非常重要的。本文将探讨如何为生产者设置监控和日志记录,以跟踪其健康状况和性能指标。
207 1
|
SQL 分布式计算 关系型数据库
Spark编程实验三:Spark SQL编程
Spark编程实验三:Spark SQL编程
437 1
|
消息中间件 存储 Prometheus
【Kafka】Kafka 提供了哪些系统工具
【4月更文挑战第11天】【Kafka】Kafka 提供了哪些系统工具
|
消息中间件 搜索推荐 Java
消息中间件JMS介绍、入门demo与spring整合
消息中间件JMS介绍、入门demo与spring整合
557 98
消息中间件JMS介绍、入门demo与spring整合
|
消息中间件 数据可视化 Kafka
kafka可视化工具
kafka可视化工具
602 0
|
缓存 自然语言处理 Java
还在为字典值、枚举值校验烦恼吗,不妨试试这个
本文介绍了如何在Java中实现常量值校验的封装,主要包括两个方面:字典值类型的校验和枚举类型的校验。首先,作者提到在进行数据验证时,实体类字段需要添加`@Valid`注解。然后,对于字典值类型的校验,可以通过`@DictVaild`注解检查当前字段值是否在数据库中的字典值类别内,或者与预定义的枚举类中的值相匹配。在进行校验时,可以设置`dictType`参数为`DictType.CODE`或`DictType.LABEL`来分别验证代码值或标签值。
392 0
|
机器学习/深度学习 分布式计算 数据库连接
[Spark精进]必须掌握的4个RDD算子之filter算子
[Spark精进]必须掌握的4个RDD算子之filter算子
315 2
|
安全 前端开发 Java
Scala与Java:综合比较
Scala与Java:综合比较
307 0