【Kafka】(四)Kafka Streams 转换算子详解2

简介: 【Kafka】(四)Kafka Streams 转换算子详解2

3.窗口操作


micro batch(微批),时间维度数据范围的计算


3.1 Tumbling(翻滚)固定大小 无重叠


翻滚窗口将流元素按照固定的时间间隔,拆分成指定的窗口,窗口和窗口间元素之间没有重叠。在下图不同颜色的record表示不同的key。可以看是在时间窗口内,每个key对应一个窗口。前闭后开


aHR0cDovL2thZmthLmFwYWNoZS5vcmcvMjMvaW1hZ2VzL3N0cmVhbXMtdGltZS13aW5kb3dzLXR1bWJsaW5nLnBuZw.png


aHR0cHM6Ly9pbWFnZXMyMDE4LmNuYmxvZ3MuY29tL2Jsb2cvOTYzOTAzLzIwMTgwOC85NjM5MDMtMjAxODA4MjMwMTMyMjU1MTYtMTgzMDU1MjQ0OC5naWY.gif


//===========================翻滚窗口=================================
        kStream
                .flatMapValues(value -> Arrays.asList(value.split(" ")))
                .groupBy((k, v) -> v)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("c152").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));


3.2 Hopping (跳跃) 固定大小 有重叠


aHR0cDovL2thZmthLmFwYWNoZS5vcmcvMjMvaW1hZ2VzL3N0cmVhbXMtdGltZS13aW5kb3dzLWhvcHBpbmcucG5n.png


aHR0cHM6Ly9pbWFnZXMyMDE4LmNuYmxvZ3MuY29tL2Jsb2cvOTYzOTAzLzIwMTgwOC85NjM5MDMtMjAxODA4MjMwMTMxMjczNTctMTg2MDgzNDcxMS5naWY.gif


//=================================跳跃窗口==========================================
KTable<Windowed<String>, Long> kTable = kStream
  .flatMapValues(value -> Arrays.asList(value.split(" ")))
  .groupBy((k, v) -> v)
  // 将分组后的数据按照窗口进行划分
  // 翻滚窗口 时间间隔10s
  // 第一个窗口:now:0 - 10s  计算
  // 第二个窗口:5-15 计算  (5-10)归属于第一个和第二个窗口
  // 10-20
  // ...
  .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(5)))
  // 指定状态存储的k v的结构类型
.count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("c152").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
//===========================================================================


3.3 session window


Session 窗口的大小动态 无重叠 数据驱动的窗口

回顾:Servelt Session 会话对象,一旦使用Session,会话会自动延长30min,Session超时策略(服务器自动删除30min未使用的Session)


Session Window该窗口用于对Key做Group后的聚合操作中。它需要对Key做分组,然后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,希望通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登录操作时,该用户(Key)的窗口即开始,当发生退出操作或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。


Session Windows用于将基于key的事件聚合到所谓的会话中,其过程称为session化。会话表示由定义的不活动间隔(或“空闲”)分隔的活动时段。处理的任何事件都处于任何现有会话的不活动间隙内,并合并到现有会话中。如果事件超出会话间隙,则将创建新会话。会话窗口的主要应用领域是用户行为分析。基于会话的分析可以包括简单的指标.


aHR0cDovL2thZmthLmFwYWNoZS5vcmcvMjMvaW1hZ2VzL3N0cmVhbXMtc2Vzc2lvbi13aW5kb3dzLTAxLnBuZw.png


如果我们接收到另外三条记录(包括两条迟到的记录),那么绿色记录key的两个现有会话将合并为一个会话,从时间0开始到结束时间6,包括共有三条记录。蓝色记录key的现有会话将延长到时间5结束,共包含两个记录。最后,将在11时开始和结束蓝键的新会话。


aHR0cDovL2thZmthLmFwYWNoZS5vcmcvMjMvaW1hZ2VzL3N0cmVhbXMtc2Vzc2lvbi13aW5kb3dzLTAyLnBuZw.png


//==================================会话窗口=========================================
KTable<Windowed<String>, Long> kTable = kStream
                .flatMapValues(value -> Arrays.asList(value.split(" ")))
    .groupBy((k, v) -> v)
    // 将分组后的数据按照窗口进行划分
    // 翻滚窗口 时间间隔10s
    // 第一个窗口:now:0 - 10s  计算
    // 第二个窗口:5-15 计算  (5-10)归属于第一个和第二个窗口
    // 10-20
    // ...
    .windowedBy(SessionWindows.with(Duration.ofSeconds(10)))
    // 指定状态存储的k v的结构类型
    .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("CC").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
//===========================================================================
kTable.toStream().foreach((k, v) -> { // 窗口计算指的是对窗口内的数据进行计算
    long start = k.window().start();
    long end = k.window().end();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String d1 = sdf.format(new Date(start));
    String d2 = sdf.format(new Date(end));
    System.out.println(d1 + "\t" + d2 + "\t" + k.key() + "\t" + v);
});


目录
相关文章
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
3月前
|
消息中间件 Java Kafka
|
3月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
54 0
|
5月前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
190 2
|
4月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
4月前
|
消息中间件 Java Kafka
Java中的流处理框架:Kafka Streams与Flink
Java中的流处理框架:Kafka Streams与Flink
|
5月前
|
消息中间件 Java Kafka
教程:Spring Boot集成Kafka Streams流处理框架
教程:Spring Boot集成Kafka Streams流处理框架
|
6月前
|
消息中间件 存储 监控
Kafka Streams:深度探索实时流处理应用程序
Apache Kafka Streams 是一款强大的实时流处理库,为构建实时数据处理应用提供了灵活且高性能的解决方案。本文将深入探讨 Kafka Streams 的核心概念、详细原理,并提供更加丰富的示例代码,以帮助大家深入理解和应用这一流处理框架。
|
消息中间件 数据采集 SQL
【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结
【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结
752 0
【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结
|
消息中间件 jstorm 分布式计算
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2
466 0
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2