本文介绍如何使用实时计算 Blink 服务访问表格存储服务(Tablestore),并进行开发。
背景
Blink 产品介绍
阿里云实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级、高性能实时大数据处理系统,由Apache Flink创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
阿里云实时计算 Flink 版独享/共享集群(原产品线)支持共享模式和独享模式两种产品模式。在本文中为了简化,称之为 Blink。
Blink 产品优势
性能优越:作业可达百万级吞吐,计算可达秒级延迟,TPC-H 性能测试可达开源引擎3~5倍。
功能强大:数十种作业指标监控,一站式开发界面,提供智能诊断系统,具有作业智能调优功能。
价格低廉:极致弹性体验,可按量付费,总资源费用低于自建。
稳定安全:服务SLA可达 99.9%,集群计算无单点,故障可自动恢复,资源租户隔离,杜绝相互干扰。
品牌认证:Flink 官方创始团队出品,中国信通院认证,进入 Forrester 象限的实时流计算产品。
兼容开源:提供最新Flink版本,与开源 Flink 接口100%兼容,实现业务平滑迁移上云。
表格存储通道服务
表格存储通道服务是基于表格存储数据接口的全增量一体化服务,通过 Tunnel Service API 和 SDK,为您提供了增量、全量和增量加全量三种类型的分布式数据实时消费通道。通过 Tunnel Service 数据通道,您可以使用流式计算的方式,消费表中存量或新增数据。实时计算 Flink 版可以将 Tunnel Service 数据通道作为流式数据的输入,每条数据类似一个 JSON 格式。Tunnel Service 数据通道的示例如下。
{
"OtsRecordType": "PUT", // 数据操作类型,包括 PUT、UPDATE 和 DELETE。
"OtsRecordTimestamp": 1506416585740836, //数据写入时间(单位为微秒),全量数据时为0。
"PrimaryKey": [
{
"ColumnName": "pk_0", //第1主键列。
"Value": 1506416585881590900
},
{
"ColumnName": "pk_1", //第2主键列。
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "PUT", // 列操作类型,包括PUT、DELETE_ONE_VERSION 和 DELETE_ALL_VERSION。
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION", // DELETE 操作没有 Value 字段。
"ColumnName": "attr_1"
}
]
}
准备工作
接下来介绍如何在同一个云账号下创建表格存储服务和 Blink 独享/共享集群服务。
开通 Blink 独享/共享集群服务
使用 Blink独享/共享集群访问表格存储前,您需要完成以下准备工作:
开通 Blink独享/共享集群服务
创建集群,创建项目
创建 AccessKey
在项目列表中点击刚刚创建的项目,进入工作台
开通表格存储服务
实例名称:blink-ots
数据表名称:blink_table
主键信息:pk_0(integer),pk_1(integer)
访问域名:https://blink-ots.cn-hangzhou.ots-internal.aliyuncs.com
通道名称:blink_tunnel
设置实例网络类型为允许任意网络访问
工作台使用和作业开发
我们已经创建了 Blink 集群的项目 ots-demo,接下来将会介绍项目工作台的基本使用方式。
工作台介绍
如图所示,工作台中展示了当前项目的基本情况:
区域展示了当前项目
区域展示了当前集群
区域中会罗列目前已经上线的作业运行情况。
工作台右上角的区域,提供了开发和运维的功能选择:
作业开发
在工作台中,点击右上角的开发按钮,进入作业开发页面。
作业区管理着所有的 SQL 作业,具体的作业编写方式,请参考 Blink 官方文档。
资源引用中可以新建自定义资源,若想更改默认的 connector,可以在此新建。
文末展示了一个简单的作业样例
运维
在作业上方点击上线按钮
按照需求配置资源,最终上线作业
在运维页面即可查看刚刚上线的作业
点击启动按钮,启动作业
通过 TaskManager -> container_xxx -> taskmanager.out 查看打印输出。(只有type="print"才会输出在这里)
使用表格存储作为源表
DDL 定义
实时计算 Flink 版支持使用 Tablestore 作为源表,示例代码如下。
CREATE TABLE demo (
pk_0 BIGINT,
pk_1 BIGINT,
col_0 BIGINT,
col_1 VARCHAR,
OtsRecordType VARCHAR header,
OtsRecordTimestamp BIGINT header,
col_1_OtsColumnType VARCHAR header
) WITH (
-- 使用表格存储作为源表
type ='ots',
endPoint ='https://blink-ots.cn-hangzhou.ots-internal.aliyuncs.com',
instanceName = 'blink-ots',
tableName ='blink_table',
tunnelName = 'blink_tunnel',
accessId ='',
accessKey ='',
ignoreDelete = 'true'
);
属性字段
表格存储源表属性字段的获取和使用方法,请参见获取数据源表属性字段。
字段名 |
说明 |
OtsRecordType |
数据操作类型 |
OtsRecordTimestamp |
数据插入系统的时间 (全量数据时,OtsRecordTimestamp 为0) |
<列名>_OtsColumnType |
某列的操作类型 |
OtsRecordEpoch |
数据时序信息的epoch |
OtsRecordRowIndex |
数据的行位置标识 |
说明
(epoch, timestamp, rowIndex) 能唯一标记出一条数据
WITH 参数
参数 |
说明 |
备注 |
type |
connector类型 |
固定值为ots 。 |
endPoint |
表格存储的实例访问地址 |
VPC 网络环境需要选择实例的 VPC Endpoint。 |
instanceName |
表格存储的实例名称 |
无 |
tableName |
表格存储的数据表名 |
实时计算读取 Tablestore 源表数据时,已读取的数据不会再被读取,如果您有再次读取全量数据的需求,则需要重新创建新的数据通道。 |
tunnelName |
表格存储数据表的数据通道名 |
无 |
accessId |
表格存储读取的AccessKey ID |
无 |
accessKey |
表格存储读取的密钥AccessKey Secret |
无 |
ignoreDelete |
是否忽略DELETE操作的数据。 |
可选,默认值为false 。 |
使用表格存储作为维表
示例
实时计算 Flink 版支持表格存储 Tablestore 作为维表,示例如下。
CREATE TABLE ots_dim_table (
pk_0 BIGINT,
pk_1 BIGINT,
col_0 BIGINT,
col_1 VARCHAR,
PRIMARY KEY (pk_0,pk_1),
PERIOD FOR SYSTEM_TIME
--定义了维表的变化周期。
) WITH (
type='ots',
--省略其余参数
);
说明
在声明维表时,必须要指名主键。
在维表 JOIN 时,ON 条件必须包含所有主键的等值条件。
Tablestore 的主键即表的 Rowkey。
WITH 参数
参数 |
说明 |
备注 |
type |
维表类型 |
固定值为ots 。 |
endPoint |
表格存储的实例访问地址 |
VPC 网络环境需要选择实例的 VPC Endpoint。 |
instanceName |
表格存储的实例名称 |
无 |
tableName |
表格存储的数据表名 |
无 |
accessId |
表格存储读取的AccessKey ID |
无 |
accessKey |
表格存储读取的密钥AccessKey Secret |
无 |
CACHE 参数
参数 |
说明 |
备注 |
cache |
缓存策略 |
表格存储维表支持以下两种缓存策略:
|
cacheSize |
缓存大小 |
当选择 LRU 缓存策略后,可以设置缓存大小,默认为 10000 行。 |
cacheTTLMs |
缓存超时时间,单位为毫秒。 |
当选择 LRU 缓存策略后,可以设置缓存失效的超时时间。 |
代码示例
CREATE TABLE ots_source (
pk_0 BIGINT,
pk_1 BIGINT,
col_0 BIGINT,
col_1 VARCHAR,
) WITH (
type='ots',
--省略其余参数
);
CREATE TABLE ots_dim_table (
pk_0 BIGINT,
col_2 VARCHAR,
PRIMARY KEY (pk_0),
PERIOD FOR SYSTEM_TIME
--定义了维表的变化周期。
) WITH (
type='ots',
--省略其余参数
);
CREATE TABLE print_sink (
pk_0 BIGINT,
col_1 VARCHAR,
col_2 VARCHAR
) WITH (
type='print'
);
INSERT INTO print_sink
SELECT
s.pk_0,
s.col_1,
d.col_2
FROM ots_source as s
JOIN ots_dim_table FOR SYSTEM_TIME AS OF PROCTIME() as d --维表JOIN时必须指定此声明。
ON s.pk_0 = d.pk_0;
维表的详细语法请参见维表JOIN语句。
使用表格存储作为结果表
DDL 定义
实时计算 Flink 版支持使用 Tablestore 作为结果输出,示例代码如下。
CREATE TABLE ots_sink (
pk_0 BIGINT,
pk_1 BIGINT,
col_0 BIGINT,
col_1 VARCHAR,
PRIMARY KEY (pk_0,pk_1)
) WITH (
type='ots',
instanceName='',
tableName='',
accessId='',
accessKey='',
endPoint='',
valueColumns='col_0,col_1'
);
说明
推荐使用数据存储注册功能,详情请参见注册表格存储Tablestore。
valueColumns 值不能是声明的主键,可以是主键之外的任意字段。
Tablestore 结果表声明中,除主键列外,至少包含一个属性列。
WITH 参数
参数 |
说明 |
备注 |
type |
结果表类型 |
固定值为ots。 |
instanceName |
实例名 |
无 |
tableName |
表名 |
无 |
endPoint |
实例访问地址 |
参见服务地址 。 |
accessId |
AccessKey ID |
无 |
accessKey |
AccessKey Secret |
无 |
valueColumns |
指定插入的字段列名 |
插入多个字段以英文逗号(,)分割。例如'ID,NAME' 。 |
bufferSize |
流入多少条数据后开始输出 |
可选,默认值为 5000,表示输入的数据达到 5000 条就开始输出。 说明 在实时计算 Flink 版系统,bufferSize 根据 Tablestore 主键对结果数据进行去重后,再在bufferSize 的基础上进行batchSize。 |
batchWriteTimeoutMs |
写入超时的时间 |
可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 |
batchSize |
一次批量写入的条数 |
可选,默认值为 100。 |
retryIntervalMs |
重试间隔时间 |
可选,单位毫秒,默认值为 1000。 |
maxRetryTimes |
最大重试次数 |
可选,默认值为 100。 |
ignoreDelete |
是否忽略DELETE操作 |
默认值为False。 |
类型映射
Tablestore字段类型 |
实时计算Flink版字段类型 |
INTEGER |
BIGINT |
STRING |
VARCHAR |
BOOLEAN |
BOOLEAN |
DOUBLE |
DOUBLE |
说明:Tablestore 结果表必须定义有Primary Key,以 Update 方式写入结果数据到 Tablestore 表。Update 方式说明请参见 Update类型。
总结
本篇文章介绍了如何使用 Blink 访问表格存储 Tablestore。后续,我们会推出从零构建 Flink on Tablestore 系列文章,并推出最佳实践文章。
希望本文对你有帮助,如果希望继续交流,可以加入我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。