时间属性

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

本文为您介绍Flink SQL支持的Event Time和Processing Time数据类型,以及watermark和计算列。

Flink SQL支持两种时间类型。
  • Event Time:您提供的事件时间(通常是数据的最原始的创建时间),event time一定是您提供在Schema里的数据。
  • Processing Time:系统对事件进行处理的本地系统时间。

Event Time

EventTime也称为rowtime。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成 EventTime。目前只支持将 Timestamp 类型(将来会支持 Long 类型)声明成 rowtime 字段。如果不是 Timestamp 类型,需要借助计算列,基于现有列构造出一个 Timestamp 列。

但由于数据本身会有乱序,加之网络抖动或其它原因,rowtime 到达的顺序和被处理的顺序可能是不一致的(乱序)。因此定义一个rowtime字段,需要显示地定义一个 Watermark计算方法。

Watremark

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,Watermark的定义是source表DDL定义的一部分。Flink提供了如下语法定义Watermark。

WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
  • watermarkName 标识Watermark的名字,可选。
  • <rowtime_field> 必须是表中已定义的一列(当前仅支持为Timestamp类型),含义是基于该列生成Watermark,并且标识该列为Event Time列,可以在后续query中用来定义窗口。
  • withOffset 是目前提供的Watermark的生成策略,是根据<rowtime_field> - offset生成Watermark的值。withOffset的第一个参数必须是<rowtime_field>
  • offset 单位为毫秒,含义为Watermark值与event time值的偏移量。

通常一条记录中的某个字段就代表了该记录的发生时间。例如,表中有个rowtime字段,类型为Timestamp,其中某个字段为1501750584000(2017-08-03 08:56:24.000)。定义一个基于该rowtime列,策略为偏移4秒的Watermark,示例如下。

WATERMARK FOR rowtime AS withOffset(rowtime, 4000)

在这种情况下,这条数据的Watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据的Watermark时间含义即:timestamp小于1501750580000(2017-08-03 08:56:20.000)的数据,都已经到达了。

说明
  • 在使用Event Time Watermark时的rowtime必须是TIMESTAMP类型。当前支持毫秒级别的、在Unix时间戳里是13位的TIMESTAMP。如果是其他类型或是在Unix时间戳不是13位,建议使用计算列来做转换。
  • Event Time和Processing Time的声明只能在源表上声明。
总结:
  1. WaterMark的含义是所有时间t'< t 的事件都已经发生。假如Watermark t已经生效,那么后续eventTime小于t的记录都会被丢弃掉(目前Flink的处理是丢弃这些来的更晚的数据,后续支持用户配置让更晚的数据也能继续update)。
  2. 针对乱序的的流,WaterMark至关重要。即使一些事件延迟到达,也不至于过于影响window窗口的计算的正确性。
  3. 并行数据流中,当Operator有多个输入流时,Operator的event time以最小流event time为准。
以下为一个使用event time聚合的示例。
CREATE TABLE tt_stream(
  a varchar,
  b varchar,
  c timeStamp,
  WATERMARK wk1 FOR c as withOffset(c, 1000)
) with (
  type = 'SLS',
  topic = 'blink_tt2tt_test',
  accessId = '0622174XXXXXXTS',
  accessKey = 'a62cfe86-bXXXXXXXb9fad2618e7b'
);

CREATE TABLE rds_output(
  id varchar,
  c TIMESTAMP, 
  f TIMESTAMP,
  cnt BIGINT
) with (
  type = 'rds',
  url = 'jdbc:mysql://XXXXXXXX3306/test',
  tableName = 'datahub2rds',
  userName = 'XXXXXt',
  password = '1XXXXX'
);

INSERT INTO rds_output
SELECT a AS id, 
     SESSION_START(c, INTERVAL '1' SECOND) AS c, 
     CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f, 
     COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(c, INTERVAL '1' SECOND), a

计算列

由于目前Watermark的rowtime列,只支持Timestamp类型(未来会支持Long类型),如果不是Timestamp类型,就需要借助计算列 ,基于现有列构造出一个Timestamp列。计算列的表达式非常灵活,可以使用任意表达式、内置函数、或是自定义函数,灵活度与 SELECT中的表达式一样。计算列在Flink SQ中可以像普通字段一样被使用。

<computed_column_definition> ::= column_name AS computed_column_expression

例如
CREATE TABLE sls_stream(    
 a INT, 
 b BIGINT,
 c VARCHAR, 
 ts AS to_timestamp(c),
 WATERMARK FOR ts AS withOffset(ts, 1000)
) with ( 
 type = 'sls',
 ...
);

如上示例中所示,源表数据中的字段c包含时间信息,但是是字符串类型。使用TO_TIMESTAMP内置函数将字符串转成了Timestamp类型,并用该计算列作为Watermark 的rowtime字段。

Processing Time

processing time是系统产生的,不在您的原始数据中,需要显式的定义一个processing time列。

filedName as PROCTIME()

这个定义需要在source的DDL中显式指明,示例如下:

CREATE TABLE tt_stream (
 a varchar, 
 b varchar, 
 c BIGINT,
 d AS PROCTIME()
) with ( 
  type = 'tt',
  ... 
);        

CREATE TABLE rds_output ( 
 id varchar, 
 c TIMESTAMP, 
 f TIMESTAMP, 
 cnt BIGINT 
) with ( 
  type = 'rds',
  ... 
);

INSERT INTO rds_output 
SELECT a AS id,
       SESSION_START(d, INTERVAL '1' SECOND) AS c, 
       SESSION_END(d, INTERVAL '1' SECOND) AS f, 
       COUNT(a) AS cnt 
FROM tt_stream 
GROUP BY SESSION(d, INTERVAL '1' SECOND), a

本文转自实时计算——时间属性

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
Shell Linux
10-20|修改当前时间
10-20|修改当前时间
|
C语言 C++
C++ 如果设置日期 & 时间基础篇
C++ 如果设置日期 & 时间基础篇
|
C语言 C++
C++ 如果设置日期 & 时间基础篇
C++ 标准库没有提供所谓的日期类型。C++ 继承了 C 语言用于日期和时间操作的结构和函数。为了使用日期和时间相关的函数和结构,需要在 C++ 程序中引用 <ctime> 头文件。
|
存储 Linux 编译器
C++ 日期和时间的相关函数
C++ 日期和时间的相关函数
299 0
定义一个时钟类(TimeDemo),属性有:时、分、秒(默认值10时30分),方法有展示当前时间、过1秒,过1分钟,过1小时后的时分秒;在测试类中实现过10秒,过10分,过10小时后的时间展示
定义一个时钟类(TimeDemo),属性有:时、分、秒(默认值10时30分),方法有展示当前时间、过1秒,过1分钟,过1小时后的时分秒;在测试类中实现过10秒,过10分,过10小时后的时间展示
407 0
|
JavaScript 开发者
日期对象|学习笔记
快速学习 日期对象
121 0
Java反射得到属性和属性的值和设置属性的值
Java反射得到属性和属性的值和设置属性的值
|
关系型数据库 Oracle
获取两个日期之间日期列表
企鹅交流群 127591054 ORACLE select date '2017-11-11' + level - 1 ddate, level from dual connect by ...
1221 0