使用 Blink 访问表格存储 Tablestore

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍如何使用实时计算 Blink 服务访问表格存储服务(Tablestore),并进行开发。背景Blink 产品介绍阿里云实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级、高性能实时大数据处理系统,由Apache Flink创始团队官方

本文介绍如何使用实时计算 Blink 服务访问表格存储服务(Tablestore),并进行开发。

背景

Blink 产品介绍

阿里云实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级、高性能实时大数据处理系统,由Apache Flink创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。

阿里云实时计算 Flink 版独享/共享集群(原产品线)支持共享模式和独享模式两种产品模式。在本文中为了简化,称之为 Blink。

Blink 产品优势

  • 性能优越:作业可达百万级吞吐,计算可达秒级延迟,TPC-H 性能测试可达开源引擎3~5倍。
  • 功能强大:数十种作业指标监控,一站式开发界面,提供智能诊断系统,具有作业智能调优功能。
  • 价格低廉:极致弹性体验,可按量付费,总资源费用低于自建。
  • 稳定安全:服务SLA可达 99.9%,集群计算无单点,故障可自动恢复,资源租户隔离,杜绝相互干扰。
  • 品牌认证:Flink 官方创始团队出品,中国信通院认证,进入 Forrester 象限的实时流计算产品。
  • 兼容开源:提供最新Flink版本,与开源 Flink 接口100%兼容,实现业务平滑迁移上云。

表格存储通道服务

表格存储通道服务是基于表格存储数据接口的全增量一体化服务,通过 Tunnel Service API 和 SDK,为您提供了增量、全量和增量加全量三种类型的分布式数据实时消费通道。通过 Tunnel Service 数据通道,您可以使用流式计算的方式,消费表中存量或新增数据。实时计算 Flink 版可以将 Tunnel Service 数据通道作为流式数据的输入,每条数据类似一个 JSON 格式。Tunnel Service 数据通道的示例如下。

{
    "OtsRecordType": "PUT",  // 数据操作类型,包括 PUT、UPDATE 和 DELETE。
    "OtsRecordTimestamp": 1506416585740836, //数据写入时间(单位为微秒),全量数据时为0。
    "PrimaryKey": [
    	{
            "ColumnName": "pk_0", //第1主键列。
            "Value": 1506416585881590900
      	},
      	{
      	     "ColumnName": "pk_1", //第2主键列。
      	     "Value": "string_pk_value"
      	}
    ],
    "Columns": [
      	{
      	     "OtsColumnType": "PUT", // 列操作类型,包括PUT、DELETE_ONE_VERSION 和 DELETE_ALL_VERSION。
            "ColumnName": "attr_0",
            "Value": "hello_table_store",
       },
       {
            "OtsColumnType": "DELETE_ONE_VERSION", // DELETE 操作没有 Value 字段。
            "ColumnName": "attr_1"
       }
    ]
}

准备工作

接下来介绍如何在同一个云账号下创建表格存储服务和 Blink 独享/共享集群服务。

开通 Blink 独享/共享集群服务

使用 Blink独享/共享集群访问表格存储前,您需要完成以下准备工作:

  1. 开通  Blink独享/共享集群服务
  2. 创建集群,创建项目
  3. 创建 AccessKey
  4. 在 RAM 控制台授权  Blink 集群访问表格存储的权限。具体请参见 独享集群授权共享集群授权
  5. 在项目列表中点击刚刚创建的项目,进入工作台

开通表格存储服务

  1. 在表格存储控制台 创建实例
  2. 在数据表中 创建通道
  3. 在本示例中,创建的表格存储实例和数据表信息如下:
  • 实例名称:blink-ots
  • 数据表名称:blink_table
  • 通道名称:blink_tunnel
  • 设置实例网络类型为允许任意网络访问

工作台使用和作业开发

我们已经创建了 Blink 集群的项目 ots-demo,接下来将会介绍项目工作台的基本使用方式。

工作台介绍

如图所示,工作台中展示了当前项目的基本情况:

  1. 区域展示了当前项目
  2. 区域展示了当前集群
  3. 区域中会罗列目前已经上线的作业运行情况。

工作台右上角的区域,提供了开发运维的功能选择:

作业开发

在工作台中,点击右上角的开发按钮,进入作业开发页面。

  • 作业区管理着所有的 SQL 作业,具体的作业编写方式,请参考  Blink 官方文档
  • 资源引用中可以新建自定义资源,若想更改默认的 connector,可以在此新建。
  • 文末展示了一个简单的作业样例

运维

  • 在作业上方点击 上线按钮

  • 按照需求配置资源,最终上线作业

  • 运维页面即可查看刚刚上线的作业

  • 点击 启动按钮,启动作业
  • 通过 TaskManager -> container_xxx -> taskmanager.out 查看打印输出。(只有 type="print"才会输出在这里)

使用表格存储作为源表

DDL 定义

实时计算 Flink 版支持使用 Tablestore 作为源表,示例代码如下。

CREATE TABLE demo (
       pk_0                BIGINT,
       pk_1                BIGINT,
       col_0               BIGINT,
       col_1               VARCHAR,
       OtsRecordType       VARCHAR header,
       OtsRecordTimestamp  BIGINT header,
       col_1_OtsColumnType VARCHAR header
) WITH (
       -- 使用表格存储作为源表
       type ='ots',
       endPoint ='https://blink-ots.cn-hangzhou.ots-internal.aliyuncs.com',
       instanceName = 'blink-ots',
       tableName ='blink_table',
       tunnelName = 'blink_tunnel',
       accessId ='',
       accessKey ='',
       ignoreDelete = 'true'
);      

属性字段

表格存储源表属性字段的获取和使用方法,请参见获取数据源表属性字段

字段名

说明

OtsRecordType

数据操作类型

OtsRecordTimestamp

数据插入系统的时间

(全量数据时,OtsRecordTimestamp

为0)

<列名>_OtsColumnType

某列的操作类型

OtsRecordEpoch

数据时序信息的epoch

OtsRecordRowIndex

数据的行位置标识

说明

  • (epoch, timestamp, rowIndex)   能唯一标记出一条数据

WITH 参数

参数

说明

备注

type

connector类型

固定值为ots

endPoint

表格存储的实例访问地址

VPC 网络环境需要选择实例的 VPC Endpoint。

instanceName

表格存储的实例名称

tableName

表格存储的数据表名

实时计算读取 Tablestore 源表数据时,已读取的数据不会再被读取,如果您有再次读取全量数据的需求,则需要重新创建新的数据通道。

tunnelName

表格存储数据表的数据通道名

accessId

表格存储读取的AccessKey ID

accessKey

表格存储读取的密钥AccessKey Secret

ignoreDelete

是否忽略DELETE操作的数据。

可选,默认值为false

使用表格存储作为维表

示例

实时计算 Flink 版支持表格存储 Tablestore 作为维表,示例如下。

CREATE TABLE ots_dim_table (
       pk_0        BIGINT,
       pk_1        BIGINT,
       col_0       BIGINT,
       col_1       VARCHAR,
       PRIMARY KEY (pk_0,pk_1),
       PERIOD FOR SYSTEM_TIME
       --定义了维表的变化周期。
) WITH (
       type='ots',
       --省略其余参数
);

说明

  • 在声明维表时,必须要指名主键。
  • 在维表 JOIN 时,ON 条件必须包含所有主键的等值条件。
  • Tablestore 的主键即表的 Rowkey。

WITH 参数

参数

说明

备注

type

维表类型

固定值为ots

endPoint

表格存储的实例访问地址

VPC 网络环境需要选择实例的 VPC Endpoint。

instanceName

表格存储的实例名称

tableName

表格存储的数据表名

accessId

表格存储读取的AccessKey ID

accessKey

表格存储读取的密钥AccessKey Secret

CACHE 参数

参数

说明

备注

cache

缓存策略

表格存储维表支持以下两种缓存策略:

  • None(默认值):无缓存。
  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

cacheSize

缓存大小

当选择 LRU 缓存策略后,可以设置缓存大小,默认为 10000 行。

cacheTTLMs

缓存超时时间,单位为毫秒。

当选择 LRU 缓存策略后,可以设置缓存失效的超时时间。

代码示例

CREATE TABLE ots_source (
       pk_0        BIGINT,
       pk_1        BIGINT,
       col_0       BIGINT,
       col_1       VARCHAR,
) WITH (
       type='ots',
       --省略其余参数
);

CREATE TABLE ots_dim_table (
       pk_0        BIGINT,
       col_2       VARCHAR,
       PRIMARY KEY (pk_0),
       PERIOD FOR SYSTEM_TIME
       --定义了维表的变化周期。
) WITH (
       type='ots',
       --省略其余参数
);

CREATE TABLE print_sink (
       pk_0        BIGINT,
       col_1       VARCHAR,
       col_2       VARCHAR
) WITH (
       type='print'
);

INSERT INTO print_sink
SELECT
       s.pk_0,
       s.col_1,
       d.col_2
FROM ots_source as s
JOIN ots_dim_table FOR SYSTEM_TIME AS OF PROCTIME() as d --维表JOIN时必须指定此声明。
ON s.pk_0 = d.pk_0;

维表的详细语法请参见维表JOIN语句

使用表格存储作为结果表

DDL 定义

实时计算 Flink 版支持使用 Tablestore 作为结果输出,示例代码如下。

CREATE TABLE ots_sink (
       pk_0        BIGINT,
       pk_1        BIGINT,
       col_0       BIGINT,
       col_1       VARCHAR,
       PRIMARY KEY (pk_0,pk_1)
) WITH (
       type='ots',
       instanceName='',
       tableName='',
       accessId='',
       accessKey='',
       endPoint='',
       valueColumns='col_0,col_1'
);

说明

  • 推荐使用数据存储注册功能,详情请参见 注册表格存储Tablestore
  • valueColumns 值不能是声明的主键,可以是主键之外的任意字段。
  • Tablestore 结果表声明中,除主键列外,至少包含一个属性列。

WITH 参数

参数

说明

备注

type

结果表类型

固定值为ots。

instanceName

实例名

tableName

表名

endPoint

实例访问地址

参见服务地址

accessId

AccessKey ID

accessKey

AccessKey Secret

valueColumns

指定插入的字段列名

插入多个字段以英文逗号(,)分割。例如'ID,NAME'

bufferSize

流入多少条数据后开始输出

可选,默认值为 5000,表示输入的数据达到 5000 条就开始输出。

说明 在实时计算 Flink 版系统,bufferSize 根据 Tablestore 主键对结果数据进行去重后,再在bufferSize 的基础上进行batchSize。

batchWriteTimeoutMs

写入超时的时间

可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

batchSize

一次批量写入的条数

可选,默认值为 100。

retryIntervalMs

重试间隔时间

可选,单位毫秒,默认值为 1000。

maxRetryTimes

最大重试次数

可选,默认值为 100。

ignoreDelete

是否忽略DELETE操作

默认值为False。

类型映射

Tablestore字段类型

实时计算Flink版字段类型

INTEGER

BIGINT

STRING

VARCHAR

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

说明:Tablestore 结果表必须定义有Primary Key,以 Update 方式写入结果数据到 Tablestore 表。Update 方式说明请参见 Update类型

总结

本篇文章介绍了如何使用 Blink 访问表格存储 Tablestore。后续,我们会推出从零构建 Flink on Tablestore 系列文章,并推出最佳实践文章。

希望本文对你有帮助,如果希望继续交流,可以加入我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
10月前
|
存储 NoSQL JavaScript
OTS(Table Store)
OTS(Table Store)是阿里云提供的分布式NoSQL数据库服务,支持海量结构化数据的存储、查询和分析。OTS具有高可用、高性能、高扩展性和低成本等特点,适用于各种场景下的数据存储和处理,例如电商、物流、游戏等。
2571 2
|
11月前
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
606 0
|
存储 SQL NoSQL
表格存储 Tablestore 十年发展总结
这篇文章接下来会先整体介绍下表格存储 Tablestore,之后会分享下在技术层面产品这几年的功能演进、技术架构演进以及稳定性优化相关的工作,以及在业务层面我们定义的核心应用场景和一些典型案例。
66561 7
表格存储 Tablestore 十年发展总结
|
存储 NoSQL
|
存储 SQL NoSQL
表格存储 Tablestore SQL 商业版介绍
表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。 表格存储在 21 年 9 月正式公测了 SQL 功能,使得你在享受表格存储全托管,灵活的存储能力之外,可以让你的业务迁移更加平顺。经
1140 0
表格存储 Tablestore SQL 商业版介绍
|
存储 运维 NoSQL
表格存储 Tablestore 简介
近十年来互联网技术得到了飞速的发展,越来越多的行业逐渐加入到了互联网的阵营中来,同时也产生了更丰富、更复杂的业务场景和需求,这对于数据应用系统的性能无疑是巨大的挑战。传统关系型数据库有什么瓶颈,如何通过分布式数据库表格存储 Tablestore 进行优化?
839 0
|
存储 SQL 运维
使用 Flink SQL 访问 Tablestore 源表
    本文将介绍如何使用使用 Flink SQL 通过流处理的方式访问 Tablestore 源表。 在流计算场景下,用户可以基于[通道服务](https://help.aliyun.com/document_detail/102489.html),利用CDC(数据变更捕获)技术,通过 Flink 完成流式消费和计算。 Flink on Tablesto
1114 0
|
存储 SQL 缓存
|
存储 SQL 开发框架
阿里云物联网平台数据转发到表格存储(Table Store)示例参考
本文主要结合物模型的结构体类型属性数据,演示payLoad的设置及规则引擎的配置。
阿里云物联网平台数据转发到表格存储(Table Store)示例参考
|
7月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
64 3