Flink 1.10.0 SQL DDL中如何定义watermark和计算列

简介: 随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.

随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.


SQL DDL 中的 watermark 和计算列语法


Flink 1.10 在 SQL DDL 中增加了针对流处理定义时间属性及产生 watermark 的语法扩展(FLIP-66 [22])。这使得用户可以在用 DDL 语句创建的表上进行基于时间的操作(例如窗口)以及定义 watermark 策略[23]。


CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
...
)


Flink提供了几种常用的watermark策略。


1,严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column


2,递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND.


3,有界时间戳(乱序) 发出水印,它是观察到的最大时间戳减去指定的延迟,例如,WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL'5'SECOND是5秒的延迟水印策略。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit.


使用 DDL 创建 Kafka 表


val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin


需要注意:


1,Flink目前不支持直接把Long类型的转成Timestamp类型的,如果你的数据源中ts是Long类型的时间戳,建表语句不能直接写成ts TIMESTAMP(3),如果想用timestamp类型的,数据源中的时间需要时UTC格式的.或者可以用我上面那种写法,利用Flink的日期函数TO_TIMESTAMP把bigint类型的转成timestamp类型的.


2,PROCTIME是内置函数产生的一个虚拟列,可以参与后续的计算.


3,WATERMARK是在ts的列上声明了一个可以容忍5秒的乱序.


4,新版本的一些connector属性和之前略微有差异,比如

connector.properties.zookeeper.connect连接zk的属性,之前是


'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息


'connector.properties.0.value' = 'xxx',


现在是合在一起了.


5,format.type目前支持的数据结构主要有三种,'csv', 'json' and 'avro'.


6,connector.version,在kafka0.11以上的版本可以写universal表示通用.


下面看完成的代码:


package flink.ddl
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
/**
  * @program: Flink1.9.0
  * @description: Flink1.9.1使用DDL创建kafka源表和mysql sink表
  * @author: JasonLee
  * @create: 2020-01-14 19:49
  */
object FlinkKafkaDDLDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(6)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        // 创建kafka源表
        val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin
        tEnv.sqlUpdate(createTable)
        val query =
            """
              | SELECT name,COUNT(age) as pv,count(distinct age) as uv,
              | TUMBLE_START(t, INTERVAL '5' second) as t_start,
              | TUMBLE_END(t, INTERVAL '5' second) as t_end
              | FROM PERSON GROUP BY name,TUMBLE(t, INTERVAL '5' second)
            """.stripMargin
        val result = tEnv.sqlQuery(query)
        result.toRetractStream[Row].print()
        tEnv.execute("Flink SQL DDL")
    }
}
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
922 43
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
380 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
6月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1037 1
|
8月前
|
SQL 存储 大数据
Dataphin V5.0:支持创建异步调用API,实现慢 SQL 复杂计算的直连消费
本文介绍了数据服务产品中异步调用的应用场景与优势,包括大数据引擎查询、复杂SQL及大规模数据下载等场景,解决了同步调用可能导致的资源浪费和性能问题。通过创建异步API、测试发布以及权限申请等功能,实现高效稳定的服务提供。以电商订单查询为例,展示了如何利用异步调用提升系统性能与用户体验。
354 9
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
264 0
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2162 27
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
972 14
|
SQL 存储 缓存
SQL计算班级语文平均分:详细步骤与技巧
在数据库管理和分析中,经常需要计算某个班级在特定科目上的平均分
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
399 1
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
305 0