Tablestore结合Blink公共云使用手册

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
日志服务 SLS,月写入数据量 50GB 1个月
简介: 前言 本文将介绍在Blink实时计算平台建立以Tablestore作为流计算的源表以及结果表作业的流程。 表格存储通道服务 表格存储通道服务是基于表格存储(Tablestore)数据接口之上的全增量一体化服务,它通过一组Tunnel Service API和SDK为用户提供了增量、全量和增量加全量三种类型的分布式数据实时消费通道。

前言

本文将介绍在Blink实时计算平台建立以Tablestore作为流计算的源表以及结果表作业的流程。

表格存储通道服务

表格存储通道服务是基于表格存储(Tablestore)数据接口之上的全增量一体化服务,它通过一组Tunnel Service API和SDK为用户提供了增量、全量和增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立Tunnel Service数据通道,用户可以通过流式计算的方式对表中历史存量和新增的数据进行消费处理。
流计算能将Tunnel Service数据通道作为流式数据的输入,每条数据类似一个JSON格式,如下所示:

{
   
  "OtsRecordType": "PUT",  // 数据操作类型,包括PUT、UPDATE、DELETE
  "OtsRecordTimestamp": 1506416585740836, //数据写入时间(微秒),全量数据时为0
  "PrimaryKey": [
    {
   
      "ColumnName": "pk_1", //第一主键列
      "Value": 1506416585881590900
    },
    {
   
      "ColumnName": "pk_2", //第二主键列
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
   
      "OtsColumnType": "Put", // 列操作类型,包括PUT、DELETE_ONE_VERSION、DELETE_ALL_VERSION
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
   
      "OtsColumnType": "DELETE_ONE_VERSION", // DELETE操作没有Value字段
      "ColumnName": "attr_1"
    }
  ]
}

其中,数据的各个主键和属性列值均可以在BLINK DDL以列名以及相应的类型映射读取,例如上述实例,我们需要定义的DDL如下所示:

create table tablestore_stream(
    pk_1 BIGINT,
    pk_2 VARCHAR,
    attr_0 VARCHAR,
    attr_1 DOUBLE,
    primary key(pk_1, pk_2)
) with (
    type ='ots',
    endPoint ='http://blink-demo.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    instanceName = "blink-demo",
    tableName ='demo_table',
    tunnelName = 'blink-demo-stream',
    accessId ='xxxxxxxxxxx',
    accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    ignoreDelete = 'false' //是否忽略delete操作的数据
);

如果字段名称有前缀,需要使用反撇,例:OTS字段名称为TEST.test,BLINK DDL定义为TEST.test。而OtsRecordType、OtsRecordTimestamp字段以及每个Column的OtsColumnType字段都会支持通过属性字段的方式读取:

字段名 说明
OtsRecordType 数据操作类型
OtsRecordTimestamp 数据操作时间(全量数据时为0)
列名_OtsColumnType 以具体列名和_"_OtsColumnType_"拼接,某列的操作类型

需要OtsRecordType和某些Column的OtsColumnType字段时,Blink提供了 HEADER 关键字用于获取源表中的属性字段,具体DDL:

create table tablestore_stream(
    OtsRecordType VARCHAR HEADER,
    OtsRecordTimestamp BIGINT HEADER,
    pk_1 BIGINT,
    pk_2 VARCHAR,
    attr_0 VARCHAR, 
    attr_1 DOUBLE,
    attr_1_OtsColumnType VARCHAR HEADER,
    primary key(pk_1, pk_2)
) with (
  ...
);

WITH参数

参数 注释说明 备注
endPoint 表格存储的实例访问地址 endPoint
instanceName 表格存储的实例名称 instanceName
tableName 表格存储的数据表名 tableName
tunnelName 表格存储数据表的数据通道名 tunnelName
accessId 表格存储读取的accessKey accessId
accessKey 表格存储读取的秘钥
ignoreDelete 是否忽略DELETE操作类型的实时数据 可选,默认为false

SQL示例

数据同步,ots sink会以update的方式写入结果表:

create table otsSource (
    pkstr VARCHAR,
    pklong BIGINT,
    col0 VARCHAR,
    primary key(pkstr, pklong)
) WITH (
    type ='ots',
      endPoint ='http://blink-demo.cn-hangzhou.ots.aliyuncs.com',
      instanceName = "blink-demo",
      tableName ='demo_table',
      tunnelName = 'blink-demo-stream',
      accessId ='xxxxxxxxxxx',
      accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
      ignoreDelete = 'true'
);

CREATE TABLE otsSink (
    pkstr VARCHAR,
    pklong BIGINT,
    col0 VARCHAR,
    primary key(pkstr, pklong)
) WITH (
    type='ots',
    instanceName='blink-target',
    tableName='demo_table',
    accessId ='xxxxxxxxxxx',
      accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    endPoint='https://blink-target.cn-hangzhou.ots.aliyuncs.com',
    valueColumns='col0'
);

INSERT INTO otsSink
SELECT t.pkstr, t.pklong, t.col0
FROM otsSource AS t

流计算作业建立流程

在Blink实时计算平台数据开发模块建立新任务,并填写节点类型、Blink版本、节点名称以及目标文件夹等相关内容,如下图所示:
image

新建任务之后,进入该任务,点击切换为SQL模式按钮。按照之前介绍的DDL定义开发自己的任务。如下图所示:
image

作业完成之后,点击发布,选择运行环境及配置可用CU,此次建立的流式作业就正式启动了,可通过运维界面管理作业以及查看作业运行相关信息。如下图所示:
image
image

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
缓存 运维 NoSQL
使用 Blink 访问表格存储 Tablestore
本文介绍如何使用实时计算 Blink 服务访问表格存储服务(Tablestore),并进行开发。背景Blink 产品介绍阿里云实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级、高性能实时大数据处理系统,由Apache Flink创始团队官方
587 0
使用 Blink 访问表格存储 Tablestore
|
流计算 NoSQL 存储
Tablestore + Blink实战:交易数据的实时统计
交易数据的实时统计是电商网站一个核心功能,可以帮助用户实时统计网站的整体销售情况,快速验证“新销售策略”的效果。我们今天介绍一个基于表格存储(Tablestore)实现交易数据的实时计算,给大家提供一个新使用方式。
4968 0
|
存储 NoSQL 关系型数据库
blink+tablestore实现无限扩展性,高实时汇总计算及排行榜
#问题背景 最近开始了一个全新的ugc项目,要求对用户的点赞,评论,转发等等的数据进行统计按权重进行积分,并进行排序。要求排行榜的实时性在5分钟内,最好能进行全实时的计算,要求高度的准确性。 实际工作中这样的场景是非常多的,主要是各种数据的实时汇总,比如用户购买总量,用户点赞总量, 商品销售总量,不是要历史的数据而是要现在最新的总量数据,这个数据可能是1天的汇总,也可能是数年的汇总。另一方面
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
125 3
|
SQL NoSQL 数据可视化
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1774 0
玩转Tablestore:使用Grafana快速展示时序数据
|
6月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何从Tablestore同步数据到MySQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
8月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
8月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
82 1
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
967 0
|
存储 消息中间件 NoSQL
物联网数据通过规则引擎流转到OTS|学习笔记
快速学习物联网数据通过规则引擎流转到OTS
353 15
物联网数据通过规则引擎流转到OTS|学习笔记