Flink 1.10.0 SQL DDL中如何定义watermark和计算列

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.

随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.


SQL DDL 中的 watermark 和计算列语法


Flink 1.10 在 SQL DDL 中增加了针对流处理定义时间属性及产生 watermark 的语法扩展(FLIP-66 [22])。这使得用户可以在用 DDL 语句创建的表上进行基于时间的操作(例如窗口)以及定义 watermark 策略[23]。


CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
...
)


Flink提供了几种常用的watermark策略。


1,严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column


2,递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND.


3,有界时间戳(乱序) 发出水印,它是观察到的最大时间戳减去指定的延迟,例如,WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL'5'SECOND是5秒的延迟水印策略。


WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit.


使用 DDL 创建 Kafka 表


val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin


需要注意:


1,Flink目前不支持直接把Long类型的转成Timestamp类型的,如果你的数据源中ts是Long类型的时间戳,建表语句不能直接写成ts TIMESTAMP(3),如果想用timestamp类型的,数据源中的时间需要时UTC格式的.或者可以用我上面那种写法,利用Flink的日期函数TO_TIMESTAMP把bigint类型的转成timestamp类型的.


2,PROCTIME是内置函数产生的一个虚拟列,可以参与后续的计算.


3,WATERMARK是在ts的列上声明了一个可以容忍5秒的乱序.


4,新版本的一些connector属性和之前略微有差异,比如

connector.properties.zookeeper.connect连接zk的属性,之前是


'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息


'connector.properties.0.value' = 'xxx',


现在是合在一起了.


5,format.type目前支持的数据结构主要有三种,'csv', 'json' and 'avro'.


6,connector.version,在kafka0.11以上的版本可以写universal表示通用.


下面看完成的代码:


package flink.ddl
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
/**
  * @program: Flink1.9.0
  * @description: Flink1.9.1使用DDL创建kafka源表和mysql sink表
  * @author: JasonLee
  * @create: 2020-01-14 19:49
  */
object FlinkKafkaDDLDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(6)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        // 创建kafka源表
        val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    t as TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
              |    proctime as PROCTIME(),
              |    WATERMARK FOR t AS t - INTERVAL '5' SECOND
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = '',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
              |    'connector.properties.zookeeper.connect' = '',  -- zk连接信息
              |    'connector.properties.bootstrap.servers' = '',  -- broker连接信息
              |    'connector.properties.group.id' = '',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin
        tEnv.sqlUpdate(createTable)
        val query =
            """
              | SELECT name,COUNT(age) as pv,count(distinct age) as uv,
              | TUMBLE_START(t, INTERVAL '5' second) as t_start,
              | TUMBLE_END(t, INTERVAL '5' second) as t_end
              | FROM PERSON GROUP BY name,TUMBLE(t, INTERVAL '5' second)
            """.stripMargin
        val result = tEnv.sqlQuery(query)
        result.toRetractStream[Row].print()
        tEnv.execute("Flink SQL DDL")
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
129 15
|
12天前
|
SQL 存储 缓存
SQL计算班级语文平均分:详细步骤与技巧
在数据库管理和分析中,经常需要计算某个班级在特定科目上的平均分
|
1月前
|
SQL 存储 并行计算
Lindorm Ganos 一条 SQL 计算轨迹
Lindorm Ganos 针对轨迹距离计算场景提供了内置函数 ST_Length_Rows,结合原生时空二级索引和时空聚合计算下推技术,能够高效过滤数据并并行执行运算任务。该方案通过主键索引和时空索引快速过滤数据,并利用多Region并行计算轨迹点距离,适用于车联网等场景。具体步骤包括根据车辆识别代码和时间戳过滤数据、范围过滤轨迹点以及并行计算距离。使用限制包括只支持点类型列聚合运算及表中轨迹点需按顺序排列等。测试结果显示,Lindorm Ganos 在不同数据量下均能实现秒级响应。
23 3
|
13天前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
37 0
|
1月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
66 2
|
1月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
40 1
|
2月前
|
SQL 安全 Java
访问者模式问题之在上面的 SQL 结构定义中, sealed 接口的作用如何理解
访问者模式问题之在上面的 SQL 结构定义中, sealed 接口的作用如何理解
|
1月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
779 7
阿里云实时计算Flink在多行业的应用和实践