Flink 1.10.0 SQL DDL中如何定义watermark和计算列

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.

随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.


SQL DDL 中的 watermark 和计算列语法


Flink 1.10 在 SQL DDL 中增加了针对流处理定义时间属性及产生 watermark 的语法扩展(FLIP-66 [22])。这使得用户可以在用 DDL 语句创建的表上进行基于时间的操作(例如窗口)以及定义 watermark 策略[23]。


CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
...
)


Flink提供了几种常用的watermark策略。


1,严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column


2,递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND.


3,有界时间戳(乱序) 发出水印,它是观察到的最大时间戳减去指定的延迟,例如,WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL'5'SECOND是5秒的延迟水印策略。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit.


使用 DDL 创建 Kafka 表


val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin


需要注意:


1,Flink目前不支持直接把Long类型的转成Timestamp类型的,如果你的数据源中ts是Long类型的时间戳,建表语句不能直接写成ts TIMESTAMP(3),如果想用timestamp类型的,数据源中的时间需要时UTC格式的.或者可以用我上面那种写法,利用Flink的日期函数TO_TIMESTAMP把bigint类型的转成timestamp类型的.


2,PROCTIME是内置函数产生的一个虚拟列,可以参与后续的计算.


3,WATERMARK是在ts的列上声明了一个可以容忍5秒的乱序.


4,新版本的一些connector属性和之前略微有差异,比如

connector.properties.zookeeper.connect连接zk的属性,之前是


'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息


'connector.properties.0.value' = 'xxx',


现在是合在一起了.


5,format.type目前支持的数据结构主要有三种,'csv', 'json' and 'avro'.


6,connector.version,在kafka0.11以上的版本可以写universal表示通用.


下面看完成的代码:


package flink.ddl
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
/**
  * @program: Flink1.9.0
  * @description: Flink1.9.1使用DDL创建kafka源表和mysql sink表
  * @author: JasonLee
  * @create: 2020-01-14 19:49
  */
object FlinkKafkaDDLDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(6)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        // 创建kafka源表
        val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin
        tEnv.sqlUpdate(createTable)
        val query =
            """
              | SELECT name,COUNT(age) as pv,count(distinct age) as uv,
              | TUMBLE_START(t, INTERVAL '5' second) as t_start,
              | TUMBLE_END(t, INTERVAL '5' second) as t_end
              | FROM PERSON GROUP BY name,TUMBLE(t, INTERVAL '5' second)
            """.stripMargin
        val result = tEnv.sqlQuery(query)
        result.toRetractStream[Row].print()
        tEnv.execute("Flink SQL DDL")
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
191 15
|
16小时前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
29 14
|
2月前
|
SQL 存储 缓存
SQL计算班级语文平均分:详细步骤与技巧
在数据库管理和分析中,经常需要计算某个班级在特定科目上的平均分
|
3月前
|
SQL 存储 并行计算
Lindorm Ganos 一条 SQL 计算轨迹
Lindorm Ganos 针对轨迹距离计算场景提供了内置函数 ST_Length_Rows,结合原生时空二级索引和时空聚合计算下推技术,能够高效过滤数据并并行执行运算任务。该方案通过主键索引和时空索引快速过滤数据,并利用多Region并行计算轨迹点距离,适用于车联网等场景。具体步骤包括根据车辆识别代码和时间戳过滤数据、范围过滤轨迹点以及并行计算距离。使用限制包括只支持点类型列聚合运算及表中轨迹点需按顺序排列等。测试结果显示,Lindorm Ganos 在不同数据量下均能实现秒级响应。
32 3
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
55 0
|
3月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
88 2
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
51 1
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
129 13