数据源表概述

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

实时计算的源表是指流式数据存储。流式数据存储驱动实时计算的运行,因此每个实时计算子作业必须提供至少一个流式数据存储。

语法

 
 
  1. CREATE TABLE tableName
  2. (columnName dataType [, columnName dataType ]*)
  3. [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];

示例

 
 
  1. create table datahub_stream(
  2. name varchar,
  3. age BIGINT,
  4. birthday BIGINT
  5. ) with (
  6. type='datahub',
  7. endPoint='http://dh-et2.aliyun-inc.com',
  8. project='blink_xxx',
  9. topic='test_xxx',
  10. accessId='0i70Rxxxxx',
  11. accessKey='yF60EwUxxxx',
  12. startTime='2017-07-21 00:00:00'
  13. );

Watermark定义

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。Watermark的定义是数据原表DDL定义的一部分。Flink提供如下语法定义:

 
 
  1. WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
  • watermarkName 标识了这个 watermark 的名字,可选。
  • <rowtime_field> 必须是表中已定义的一列(当前仅支持为Timestamp类型),含义是基于该列生成 watermark,并且标识该列为 Event Time 列,可以在后续 query 中用来定义窗口。
  • withOffset 是目前提供的watermark的生成策略,是根据<rowtime_field> - offset生成watermark的值。withOffset的第一个参数必须是<rowtime_field>
  • offset 单位为毫秒,含义为watermark值与event time值的偏移量。

通常一条记录中的某个字段就代表了该记录的发生时间。例如,表中有个rowtime字段,类型为Timestamp,其中某个值为1501750584000(2017-08-03 08:56:24.000),如果您需要定义一个基于该rowtime列的watermark,且watermark策略为偏移4秒,需要如下定义。

 
 
  1. WATERMARK FOR rowtime AS withOffset(rowtime, 4000)

这条数据的watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据中timestamp小于1501750580000(2017-08-03 08:56:20.000)的数据,都已经到达了。

计算列

概念

计算列是虚拟列,并非实际存储在表中。计算列的表达式可以使用其他列中的数据来计算其所属列的值,可以使用表达式、内置函数、或是自定义函数。灵活度与SELECT中的表达式一样。计算列在Flink中可以像普通字段一样被使用。

用途

目前watermark的rowtime列只支持Timestamp类型(未来会支持Long类型),watermark只能定义在源表DDL中,如果您的源表中没有 Timestamp类型的列,需要从其他类型的字段转换而来,可以使用计算列来转换。

语法

 
 
  1. <computed_column_definition> ::= column_name AS computed_column_expression

示例

 
 
  1. #如果datahub的TIME字段是微秒级别的(16位Unix时间戳),可以用计算列来转换。
  2. CREATE TABLE sls_stream(
  3. a INT,
  4. b BIGINT,
  5. TIME BIGINT,
  6. ts AS TO_TIMESTAMP(TIME/1000),
  7. WATERMARK FOR ts AS withOffset(ts, 1000)
  8. ) with (
  9. type = 'DATAHUB',
  10. ...
  11. );

如上示例中所示,源表数据中的字段TIME包含时间信息,为BIGINT类型。用计算列的功能将字段TIME转换成了Timestamp类型的ts字段,并将ts字段作为watermark的rowtime字段。

本文转自实时计算——数据源表概述

相关实践学习
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
相关文章
|
1月前
|
DataWorks 关系型数据库 大数据
DataWorks常见问题之数据源使用连接串模式新增 postgres 数据源报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
32 1
|
3天前
|
存储 SQL 数据管理
平台设计-固定表结构与可自定义表结构
整个平台的表结构分为两种:固定的和可自定义的。
|
1月前
|
缓存 分布式计算 DataWorks
DataWorks常见问题之新增数据源的时候holo类型数据源点击无反应如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
43 4
|
2月前
|
SQL 分布式计算 关系型数据库
Dataphin实现MaxCompute外表数据快速批量同步至ADB MySQL
当前大数据时代背景下,企业对数据的处理、分析和实时应用的需求日益增强。阿里云MaxCompute广泛应用于海量数据的ETL、数据分析等场景,但在将处理后的数据进一步同步至在线数据库系统,如ADB MySQL 3.0(阿里云自研的新一代云原生关系型数据库MySQL版)以支持实时查询、业务决策等需求时,可能会遇到数据迁移速度缓慢的问题。 DataphinV3.14版本支持外表导入SQL的带参调度,实现通过MaxCompute外表的方式将数据批量同步至ADB MySQL 3.0中,显著提升数据迁移的速度和效率。
283 1
|
4月前
|
SQL 流计算
Hologres源表定义的时候,可以映射为flink的timestmap么,那这个怎么映射呢?
Hologres源表定义的时候,可以映射为flink的timestmap么,那这个怎么映射呢?
106 0
|
9月前
|
存储 数据库
数据库视频第四章 数据表管理
数据库视频第四章 数据表管理
|
9月前
|
关系型数据库 MySQL 数据库
如何创建数据表
如何创建数据表
|
SQL 数据可视化 数据库
怎么使用可视化数据库工具 DBeaver 在账单类型 type 表里添加数据?
怎么使用可视化数据库工具 DBeaver 在账单类型 type 表里添加数据?
385 0
怎么使用可视化数据库工具 DBeaver 在账单类型 type 表里添加数据?
|
关系型数据库 MySQL 数据库
数据库学习-表的创建作业示例【带源码】
MySQL数据库 “表的创建 ” 习题示例,包含源码,能建立起对于表的创建的基本概念
122 0
数据库学习-表的创建作业示例【带源码】
|
关系型数据库 MySQL 数据库
MySQL数据库(15):高级数据操作-新增数据
MySQL数据库(15):高级数据操作-新增数据