窗口函数概述

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介:

窗口函数

Flink SQL支持基于无限大窗口的聚合(无需显式定义在SQL Query中添加任何的窗口)以及对一个特定的窗口的聚合。例如,需要统计在过去的1分钟内有多少用户点击了某个的网页,可以通过定义一个窗口来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
Flink SQL支持的窗口聚合主要是两种:window aggregate和over aggregate。两者最核心的区别是,over aggregate从语义上保障了对每个输入都有一个输出。因此over agregate常被用于ranking及moving average等场景。
本节接下来主要介绍window aggregate。Window aggregate支持两种时间类型做窗口:Event Time和Processing Time。每种类型下,又分别支持三种窗口类型:滚动窗口(TUMBLE)、滑动窗口(HOP)和会话窗口(SESSION)。

时间类型

Flink SQL支持两种时间类型。

  1. Processing Time:系统对事件进行处理的本地系统时间。
  2. Event Time:您提供的事件时间(通常是数据的最原始的创建时间),event time一定是您提供在Schema里的数据。

下图中是不同时间属性在实时计算流程中的位置。

时间属性

从图的定义可以看出,ingestion time和 processing time是系统为流记录增加的时间属性,您并不能控制。EventTime则是流记录本身携带的时间属性。由于数据本身存在乱序以及网络抖动等其它原因,event time为t1(对应partition1)时刻的纪录,有可能会晚于t2(对应partition2)时刻的被Flink处理,即t2 > t1。

基于Processing Time的Aggregate:

processing time是系统产生的,不在您的原始数据中,您需要显式定义一个processing time列。


  
  
  1. filedName as PROCTIME()

这个定义需要在source的DDL中显式指明,示例如下。


  
  
  1. CREATE TABLE tt_stream (
  2. a varchar,
  3. b varchar,
  4. c BIGINT,
  5. d AS PROCTIME()
  6. ) with (
  7. type = 'SLS',
  8. topic = 'blink_tt2tt_test',
  9. accessId = '06221748XXXX',
  10. accessKey = 'a62cfe86-ba5a-4eeXXXXXXX7b'
  11. );
  12. CREATE TABLE rds_output (
  13. id varchar,
  14. c TIMESTAMP,
  15. f TIMESTAMP,
  16. cnt BIGINT
  17. ) with (
  18. type = 'rds',
  19. url = 'jdbc:mysql://XXXXXXXXX:3306/test',
  20. tableName = 'datahub2rds',
  21. userName = 'tXXXX',
  22. password = '1XXXXX6'
  23. );
  24. INSERT INTO rds_output
  25. SELECT a AS id,
  26. SESSION_START(d, INTERVAL '1' SECOND) AS c,
  27. SESSION_END(d, INTERVAL '1' SECOND) AS f,
  28. COUNT(a) AS cnt
  29. FROM tt_stream
  30. GROUP BY SESSION(d, INTERVAL '1' SECOND), a

基于Event Time的Aggregate:

Event Time是您的原始数据,您不需要显式重新定义一个event time列, 但必须指定watermark的计算方法。这是因为您的数据往往是乱序的,如果不配置一个watermark来合理的delay您的数据,数据聚合的结果可能存在很大的偏差。

Watermark

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,watermark的定义是source表DDL定义的一部分。Flink提供了如下语法定义watermark。


  
  
  1. WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
  • watermarkName 标识watermark的名字,可选。
  • <rowtime_field> 必须是表中已定义的一列(当前仅支持为Timestamp类型),含义是基于该列生成 watermark,并且标识该列为Event Time列,可以在后续query中用来定义窗口。
  • withOffset 是目前提供的watermark的生成策略,是根据<rowtime_field> - offset生成watermark的值。withOffset的第一个参数必须是<rowtime_field>
  • offset 单位为毫秒,含义为watermark值与event time值的偏移量。

通常一条记录中的某个字段就代表了该记录的发生时间。例如,表中有个rowtime字段,类型为Timestamp,其中某个字段为1501750584000(2017-08-03 08:56:24.000)。如果您需要定义一个基于该rowtime列的watermark,watermark策略为偏移4秒,需要如下定义。


  
  
  1. WATERMARK FOR rowtime AS withOffset(rowtime, 4000)

在这种情况下,这条数据的watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据的watermark时间含义即:timestamp小于1501750580000(2017-08-03 08:56:20.000)的数据,都已经到达了。

注意:

  • 在使用Event Time watermark时的rowtime必须是TIMESTAMP类型。当前支持毫秒级别的、在Unix时间戳里是13位的TIMESTAMP。如果是其他类型或是在Unix时间戳不是13位,建议使用计算列来做转换。
  • Event Time和Processing Time的声明只能在源表上声明。

总结:

  1. WaterMark的含义是所有时间t’< t 的事件都已经发生。假如watermark t已经生效,那么后续eventTime小于t的记录都会被丢弃掉(目前Flink的处理是丢弃这些来的更晚的数据,后续支持用户配置让更晚的数据也能继续update)。
  2. 针对乱序的的流,WaterMark至关重要。即使一些事件延迟到达,也不至于过于影响window窗口的计算的正确性。
  3. 并行数据流中,当Operator有多个输入流时,Operator的event time以最小流event time为准。

image

示例

以下为一个使用event time聚合的示例。


   
   
  1. CREATE TABLE tt_stream(
  2. a varchar,
  3. b varchar,
  4. c timeStamp,
  5. WATERMARK wk1 FOR c as withOffset(c, 1000)
  6. ) with (
  7. type = 'SLS',
  8. topic = 'blink_tt2tt_test',
  9. accessId = '0622174XXXXXXTS',
  10. accessKey = 'a62cfe86-bXXXXXXXb9fad2618e7b'
  11. );
  12. CREATE TABLE rds_output(
  13. id varchar,
  14. c TIMESTAMP,
  15. f TIMESTAMP,
  16. cnt BIGINT
  17. ) with (
  18. type = 'rds',
  19. url = 'jdbc:mysql://XXXXXXXX3306/test',
  20. tableName = 'datahub2rds',
  21. userName = 'XXXXXt',
  22. password = '1XXXXX'
  23. );
  24. INSERT INTO rds_output
  25. SELECT a AS id,
  26. SESSION_START(c, INTERVAL '1' SECOND) AS c,
  27. CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f,
  28. COUNT(a) AS cnt
  29. FROM tt_stream
  30. GROUP BY SESSION(c, INTERVAL '1' SECOND), a
本文转自实时计算—— 窗口函数概述
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
5月前
|
缓存 分布式计算 大数据
MaxCompute产品使用合集之行转列的函数如何与group by和聚合函数一起使用
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(二)
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(二)
|
6月前
|
Java 数据库连接 API
Flink报错问题之用Tumble窗口函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL 关系型数据库 RDS
|
SQL JSON Java
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(一)
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(一)
|
SQL 存储 关系型数据库
Mysql数据库基础第八章:窗口函数和公用表表达式(CTE)
# 1.窗口函数 MySQL从8.0版本开始支持窗口函数。窗口函数的作用类似于在查询中对数据进行分组,不同的是,分组操作会把分组的结果聚合成一条记录,而窗口函数是将结果置于每一条数据记录中。
Mysql数据库基础第八章:窗口函数和公用表表达式(CTE)
|
SQL HIVE 开发者
Hive 高阶--分组窗口函数--聚合函数集成分组函数(SUM)|学习笔记
快速学习 Hive 高阶--分组窗口函数--聚合函数集成分组函数(SUM)
331 0
|
关系型数据库 MySQL 数据库
MySQL数据库:第六章:分组函数/聚合函数
MySQL数据库:第六章:分组函数/聚合函数
|
消息中间件 算法 数据处理
深入Flink系列——watermark使用与源码详解
# 1 Flink时间体系 本节我们主要关注Flink的时间体系,包括Flink的时间语义、watermark机制及watermark的生成与传播原理,主要进行一些flink watermark理论知识的梳理。 ## 1.1 Flink的时间语义 Flink支持三种时间概念:EventTime/ProcessingTime/IngestionTime,即事件时间、处理时间、摄入时间。 ![imag
2953 0
深入Flink系列——watermark使用与源码详解
开发指南—函数—窗口函数
传统的Group By函数会按照分组后的查询结果进行聚合计算,且每个分组只输出一条数据。但与传统的Group By函数不同,窗口函数(也称OLAP函数)可以为每个分组返回多个值,且不会影响记录的数量。本文介绍如何使用窗口函数
143 0