使用EMR SQL 流处理 Tablestore

简介: 通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于流计算,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.aliyun.com/document_

通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于流计算,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。

前提条件

  • 已创建E-MapReduce Hadoop集群。具体操作,请参见创建集群
    创建集群时,请确保打开挂载公网开关,将集群挂载到公网,用于Shell远程登录服务器。

说明:本文使用Shell命令演示,如果需要使用E-MapReduce的图形化页面进行数据开发。具体操作,请参见数据开发

image.png

快速开始

1. 在表格存储侧创建数据表和通道

1.创建Source表和Sink表。具体操作,请参见概述。
Source表名称为OrderSource,主键列分别为UserId(用户ID)和OrderId(订单ID),属性列分别为price(价格)和timestamp(订单时间),数据示例如下图所示。

image.png

2.Sink表名称为OrderStreamSink,主键列分别为begin(开始时间)、end(结束时间),属性列分别为count(订单数)和totalPrice(订单总金额)。其中开始时间和结束时间的格式为“yyyy-MM-dd HH:mm:ss”,例如2019-11-27 14:54:00。

3.在Source表上创建通道。具体操作,请参见快速入门
通道列表中会显示该通道的通道名、通道ID、通道类型等信息。其中通道ID用于后续的流式处理。
image.png

2. 在EMR集群侧创建Spark外表

1.登录EMR Header服务器
2.执行如下命令启动spark-sql命令行,用于Spark外表创建和后续的SQL实战操作。
其中Spark的标准启动参数为--num-executors 32 --executor-memory 2g --executor-cores 2,可以根据具体的集群配置进行自定义调整。表示上传jar包的版本信息,请根据实际填写,例如2.1.0-SNAPSHOT。

```shell
spark-sql --jars emr-datasources_shaded_2.11-<Version>.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2

```

3.创建Source外表order_source(对应表格存储的OrderSource表)。

  • 参数 参数 说明
    endpoint 表格存储实例访问地址,EMR集群中使用VPC地址。
    access.key.id 阿里云账号的AccessKey ID。
    access.key.secret 阿里云账号的AccessKey Secret。
    instance.name 表格存储实例访问地址,EMR集群中使用VPC地址。
    table.name 表格存储的数据表名称。
    catalog 数据表的Schema定义。
    • 示例
    DROP TABLE IF EXISTS order_source;
    CREATE TABLE order_source
    USING tablestore
    OPTIONS(
    endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
    access.key.id="",
    access.key.secret="",
    instance.name="vehicle-test",
    table.name="OrderSource",
    catalog='{"columns": {"UserId": {"type": "string"}, "OrderId": {"type": "string"},"price": {"type": "double"}, "timestamp": {"type": "long"}}}'
    );    

3. 实时流计算

实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回表格存储的数据表中。

1.创建流计算的Sink外表order_stream_sink(对应表格存储的OrderStreamSink表)。创建Sink外表与创建Source外表的参数设置中只有catalog字段有差别,其他参数设置均相同。

2.在Source外表order_source上创建视图。创建视图时需要设置Source表上通道的通道ID。

3.在视图上运行Stream SQL作业进行实时聚合,且将聚合结果实时写回表格存储的OrderStreamSink表。

```sql
//创建Sink外表order_stream_sink(对应表格存储的OrderStreamSink表)。
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"type": "string"},"end": {"type": "string"}, "count": {"type": "long"}, "totalPrice": {"type": "double"}}}'
);

//在order_source表上创建视图order_source_stream_view。
CREATE SCAN order_source_stream_view ON order_source USING STREAM
OPTIONS(
tunnel.id="4987845a-1321-4d36-9f4e-73d6db63bf0f", 
maxoffsetsperchannel="10000");

//在视图order_source_stream_view上运行Stream SQL作业,如下样例会按30s粒度进行订单数和订单金额的聚合。
//将聚合结果实时写回表格存储OrderStreamSink表。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");
```

运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果保存在OrderStreamSink表中。

image.png

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
6月前
|
SQL 消息中间件 API
Flink---14、Flink SQL(SQL-Client准备、流处理中的表、时间属性、DDL)
Flink---14、Flink SQL(SQL-Client准备、流处理中的表、时间属性、DDL)
EMQ
|
11月前
|
SQL 存储 物联网
eKuiper 源码解读:从一条 SQL 到流处理任务的旅程
在本篇文章中,我们以梳理关键代码节点的方式了解了 eKuiper 的 SQL 计算引擎中是如何解析、处理,并最终执行这条 SQL 得到相应的结果。对于整个计算引擎关键处理节点里,我们了解了每个环节的代码大致是如何运行的。
EMQ
116 0
eKuiper 源码解读:从一条 SQL 到流处理任务的旅程
|
SQL 存储 自然语言处理
表格存储最佳实践:使用多元索引加速 SQL 查询
表格存储(Tablestore)在 2022 年 5 月正式发布了 SQL 商业化版本,业务上只需要在数据表上建立映射关系,就可以基于 SQL 引擎方便地对表格存储中的数据进行访问和计算,大大地降低了用户的学习成本。
668 0
|
存储 SQL NoSQL
表格存储 Tablestore SQL 商业版介绍
表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。 表格存储在 21 年 9 月正式公测了 SQL 功能,使得你在享受表格存储全托管,灵活的存储能力之外,可以让你的业务迁移更加平顺。经
1136 0
表格存储 Tablestore SQL 商业版介绍
|
SQL 存储 Java
表格存储 SQL 查询多元索引
多元索引是表格存储产品中一个重要的功能,多元索引使用倒排索引技术为表格存储提供了非主键列上的快速检索功能,另外也提供了统计聚合功能。表格存储近期开放了SQL查询功能,SQL引擎默认从原始表格中读取数据,非主键列上的查询需要扫描全表。
表格存储 SQL 查询多元索引
|
存储 SQL NoSQL
表格存储 SQL 功能快速上手
# 功能介绍 表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。​ 表格存储正式发布了 SQL 功能,满足用户业务平滑迁移到表格存储并可以继续通过 SQL 方式访问表格存储,表格存储
1428 0
|
SQL 存储 Cloud Native
表格存储 SQL 操作实战
表格存储做为一款结构化存储系统,近期发布了新功能 SQL,大幅简化了查询的门槛,用户无需学习繁琐的 SDK,也不用区分表,索引等不同的接口,可以像访问传统的 MySQL 这类数据库一样,使用 SQL 的方式访问云原生的结构化大数据存储。下面我们就来具体实操下,看看查询用起来顺不顺手。
481 0
|
SQL 存储 JSON
表格存储 SQL 数据类型详解
本文主要介绍 Tablestore SQL中的数据类型与 MySQL 数据类型之间的映射关系。 ​ ## 背景介绍 ### Tablestore 数据类型 Tablestore 中的数据类型支持如下表所示,其中主键列支持的数据类型包括String、Integer和Binary,属性列支持的数据类型包括String、Integer、Double、Boolean和Binary。 - 主键列支持的数
295 0
|
SQL 存储 测试技术
表格存储 SQL 元数据操作实战
本文主要介绍表格存储 SQL 支持的元数据操作。 ## 背景说明 如下表所示,目前表格存储 SQL 支持的元数据操作主要分为两大类:DDL操作和Admin操作。其中DDL操作包括:CREATE TABLE, DROP MAPPING TABLE, DESCRIBE TABLE。Admin操作包括:SHOW INDEX, SHOW TABLES。更多的元数据操作将在后续的版本迭代中支持,敬请期待。
208 0
|
SQL Java Apache
【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤
以测试集群版本为例(EMR-4.4.1)—— Flink SQL Client 集成 Hive 使用文档
【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤

热门文章

最新文章