实时计算的源表是指流式数据存储。流式数据存储驱动实时计算的运行,因此每个实时计算子作业必须提供至少一个流式数据存储。
语法
CREATE TABLE tableName
(columnName dataType [, columnName dataType ]*)
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
示例
create table datahub_stream(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='datahub',
endPoint='http://dh-et2.aliyun-inc.com',
project='blink_xxx',
topic='test_xxx',
accessId='0i70Rxxxxx',
accessKey='yF60EwUxxxx',
startTime='2017-07-21 00:00:00'
);
Watermark定义
Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。Watermark的定义是数据原表DDL定义的一部分。Flink提供如下语法定义:
WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
-
watermarkName
标识了这个 watermark 的名字,可选。 -
<rowtime_field>
必须是表中已定义的一列(当前仅支持为Timestamp
类型),含义是基于该列生成 watermark,并且标识该列为 Event Time 列,可以在后续 query 中用来定义窗口。 -
withOffset
是目前提供的watermark的生成策略,是根据<rowtime_field> - offset
生成watermark的值。withOffset的第一个参数必须是<rowtime_field>
。 -
offset
单位为毫秒,含义为watermark值与event time值的偏移量。
通常一条记录中的某个字段就代表了该记录的发生时间。例如,表中有个rowtime字段,类型为Timestamp,其中某个值为1501750584000(2017-08-03 08:56:24.000)
,如果您需要定义一个基于该rowtime列的watermark,且watermark策略为偏移4秒,需要如下定义。
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
这条数据的watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)
。这条数据中timestamp小于1501750580000(2017-08-03 08:56:20.000)
的数据,都已经到达了。
计算列
概念
计算列是虚拟列,并非实际存储在表中。计算列的表达式可以使用其他列中的数据来计算其所属列的值,可以使用表达式、内置函数、或是自定义函数。灵活度与SELECT中的表达式一样。计算列在Flink中可以像普通字段一样被使用。
用途
目前watermark的rowtime列只支持Timestamp类型(未来会支持Long类型),watermark只能定义在源表DDL中,如果您的源表中没有 Timestamp类型的列,需要从其他类型的字段转换而来,可以使用计算列来转换。
语法
<computed_column_definition> ::= column_name AS computed_column_expression
示例
#如果datahub的TIME字段是微秒级别的(16位Unix时间戳),可以用计算列来转换。
CREATE TABLE sls_stream(
a INT,
b BIGINT,
TIME BIGINT,
ts AS TO_TIMESTAMP(TIME/1000),
WATERMARK FOR ts AS withOffset(ts, 1000)
) with (
type = 'DATAHUB',
...
);
如上示例中所示,源表数据中的字段TIME
包含时间信息,为BIGINT类型。用计算列的功能将字段TIME
转换成了Timestamp类型的ts
字段,并将ts
字段作为watermark的rowtime字段。
本文转自实时计算——数据源表概述