Structured Streaming之Event-Time的Window操作

简介: 笔记

基于Event-Time的Window操作

详细工作流程:10.png11.png12.png

方式一:

package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple2;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 9:44 下午
 */
public class EventTimeWindow {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<Row> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .option("includeTimestamp",true)
                .load();
        /**
         * 输入数据:spark storm hive
         * 接受到数据模型:
         * 第一步:words -> spark storm hive,  timestamp -> 2019-09-09 12:12:12
         *        eg: tuple -> (spark storm hive , 2019-09-09 12:12:12)
         *
         * 第二步:split()操作
         *        对tuple中的key进行split操作
         *
         * 第三步:flatMap()操作
         * 返回类型 -> list(tuple(spark,2019-09-09 12:12:12),
         *               tuple(storm,2019-09-09 12:12:12),
         *               tuple(hive,2019-09-09 12:12:12))
         */
        Dataset<Row> words =  dflines.as(Encoders.tuple(Encoders.STRING(),Encoders.TIMESTAMP()))
                .flatMap(new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
                    @Override
                    public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> stringTimestampTuple2) throws Exception {
                        List<Tuple2<String, Timestamp>> result = new ArrayList<>();
                        for (String word : stringTimestampTuple2._1.split(" ")){
                            result.add(new Tuple2<>(word,stringTimestampTuple2._2));
                        }
                        return result.iterator();
                    }
                },Encoders.tuple(Encoders.STRING(),Encoders.TIMESTAMP())).toDF("word","wordtime");
        // result table
        Dataset<Row> windowedCounts = words.groupBy(
                functions.window(words.col("wordtime"),
                        "10 minutes",
                        "5 minutes"),
                words.col("word")
        ).count();
        StreamingQuery query = windowedCounts.writeStream()
                .outputMode("update")
                .format("console")
                .option("truncate","false")
                .start();
        query.awaitTermination();
    }
}

方式二:

package com.kfk.spark.structuredstreaming;
import java.sql.Timestamp;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 9:25 下午
 */
public class EventData {
    //timestamp: Timestamp,
    //word: String
    private Timestamp wordtime;
    private String word;
    public Timestamp getWordtime() {
        return wordtime;
    }
    public void setWordtime(Timestamp wordtime) {
        this.wordtime = wordtime;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
    public EventData() {
    }
    public EventData(Timestamp wordtime, String word) {
        this.wordtime = wordtime;
        this.word = word;
    }
}
package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.sql.Timestamp;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 9:44 下午
 */
public class EventTimeWindow2 {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        /**
         *  数据源:2019-03-29 16:40:00,hadoop
         *        2019-03-29 16:40:10,storm
         *
         *  // 根据javaBean转换为dataset
         */
        Dataset<EventData> dfeventdata = dflines.map(new MapFunction<String, EventData>() {
            @Override
            public EventData call(String value) throws Exception {
                String[] lines  = value.split(",");
                return new EventData(Timestamp.valueOf(lines[0]),lines[1]);
            }
        }, ExpressionEncoder.javaBean(EventData.class));
        // result table
        Dataset<Row> windowedCounts = dfeventdata.groupBy(
                functions.window(dfeventdata.col("wordtime"),
                        "10 minutes",
                        "5 minutes"),
                dfeventdata.col("word")
        ).count();
        StreamingQuery query = windowedCounts.writeStream()
                .outputMode("update")
                .format("console")
                .option("truncate","false")
                .start();
        query.awaitTermination();
    }
}


相关文章
|
9天前
|
Linux 网络架构
perf_event_open学习 —— design
perf_event_open学习 —— design
|
Python 流计算 API
PyFlink 教程(三):PyFlink DataStream API - state & timer
介绍如何在 Python DataStream API 中使用 state & timer 功能。
PyFlink 教程(三):PyFlink DataStream API - state & timer
|
4月前
|
数据安全/隐私保护 流计算
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
260 2
|
消息中间件 Oracle 关系型数据库
Flink CDC(Change Data Capture)
Flink CDC(Change Data Capture)是一个 Flink 应用程序,用于从关系型数据库(如 MySQL、PostgreSQL 等)中捕获数据更改,将其转换为流数据,并将其发送到 Flink
251 1
|
消息中间件 Oracle 关系型数据库
Flink CDC (Change Data Capture)
Flink CDC (Change Data Capture) 是一种基于 Flink 的流式数据处理技术,用于捕获数据源的变化,并将变化发送到下游系统。Flink CDC 可以将数据源的变化转换为流式数据,并实时地将数据流发送到下游系统,以便下游系统及时处理这些变化。
517 1
|
BI API 数据处理
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。
634 0
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
|
消息中间件 缓存 分布式计算
Structured_Sink_Trigger | 学习笔记
快速学习 Structured_Sink_Trigger
128 0
Structured_Sink_Trigger | 学习笔记
|
Java Apache 流计算
Flink 原理与实现:Session Window
在[上一篇文章:Window机制](http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/)中,我们介绍了窗口的概念和底层实现,以及 Flink 一些内建的窗口,包括滑动窗口、翻滚窗口。本文将深入讲解一种较为特殊的窗口:会话窗口(session window)。建议您在阅读完上一篇文章的基础上再阅读本文。 当我们
8987 1
|
存储 流计算
Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口
​上一篇文章提到了CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。...
386 0
Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口