Flink 1.10.0 SQL DDL中如何定义watermark和计算列

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.

随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.


SQL DDL 中的 watermark 和计算列语法


Flink 1.10 在 SQL DDL 中增加了针对流处理定义时间属性及产生 watermark 的语法扩展(FLIP-66 [22])。这使得用户可以在用 DDL 语句创建的表上进行基于时间的操作(例如窗口)以及定义 watermark 策略[23]。


CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
...
)


Flink提供了几种常用的watermark策略。


1,严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column


2,递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND.


3,有界时间戳(乱序) 发出水印,它是观察到的最大时间戳减去指定的延迟,例如,WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL'5'SECOND是5秒的延迟水印策略。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit.


使用 DDL 创建 Kafka 表


val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin


需要注意:


1,Flink目前不支持直接把Long类型的转成Timestamp类型的,如果你的数据源中ts是Long类型的时间戳,建表语句不能直接写成ts TIMESTAMP(3),如果想用timestamp类型的,数据源中的时间需要时UTC格式的.或者可以用我上面那种写法,利用Flink的日期函数TO_TIMESTAMP把bigint类型的转成timestamp类型的.


2,PROCTIME是内置函数产生的一个虚拟列,可以参与后续的计算.


3,WATERMARK是在ts的列上声明了一个可以容忍5秒的乱序.


4,新版本的一些connector属性和之前略微有差异,比如

connector.properties.zookeeper.connect连接zk的属性,之前是


'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息


'connector.properties.0.value' = 'xxx',


现在是合在一起了.


5,format.type目前支持的数据结构主要有三种,'csv', 'json' and 'avro'.


6,connector.version,在kafka0.11以上的版本可以写universal表示通用.


下面看完成的代码:


package flink.ddl
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
/**
  * @program: Flink1.9.0
  * @description: Flink1.9.1使用DDL创建kafka源表和mysql sink表
  * @author: JasonLee
  * @create: 2020-01-14 19:49
  */
object FlinkKafkaDDLDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(6)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        // 创建kafka源表
        val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin
        tEnv.sqlUpdate(createTable)
        val query =
            """
              | SELECT name,COUNT(age) as pv,count(distinct age) as uv,
              | TUMBLE_START(t, INTERVAL '5' second) as t_start,
              | TUMBLE_END(t, INTERVAL '5' second) as t_end
              | FROM PERSON GROUP BY name,TUMBLE(t, INTERVAL '5' second)
            """.stripMargin
        val result = tEnv.sqlQuery(query)
        result.toRetractStream[Row].print()
        tEnv.execute("Flink SQL DDL")
    }
}
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1088 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
216 0
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
138 0
|
SQL 关系型数据库 MySQL
INSERT INTO t_a.tableName SELECT * FROM t_b.tableName 如何通过定义一个list对象,包含多个tableName,循环执行前面的sql,用MySQL的语法写
【8月更文挑战第7天】INSERT INTO t_a.tableName SELECT * FROM t_b.tableName 如何通过定义一个list对象,包含多个tableName,循环执行前面的sql,用MySQL的语法写
160 5
|
SQL 流计算
实时计算 Flink版操作报错合集之怎么向一个未定义列的表中写入数据
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
关系型数据库 MySQL 数据库
实时计算 Flink版产品使用问题之如何排除某个列进行同步MySQL数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 安全 Java
访问者模式问题之在上面的 SQL 结构定义中, sealed 接口的作用如何理解
访问者模式问题之在上面的 SQL 结构定义中, sealed 接口的作用如何理解
|
监控 Apache 流计算
时间的守卫者:揭秘Flink中Watermark如何掌控数据流的时空秩序?
【8月更文挑战第26天】Apache Flink是一款功能强大的流处理框架,其Watermark机制为核心,确保了系统即使面对数据乱序或延迟也能准确处理时间相关的特性。Watermark作为一种特殊事件,标记了所有在此之前发生事件的最晚时间点,这对于时间窗口操作至关重要。
219 0
|
存储 Java 关系型数据库
实时计算 Flink版产品使用问题之以jar包方式同步数据是否需要定义存储oss的位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
分布式计算 BI MaxCompute
SQL 能力问题之输出聚合的维度列的名称,如何解决
SQL 能力问题之输出聚合的维度列的名称,如何解决