使用EMR SQL 流处理 Tablestore

简介: 通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于流计算,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.aliyun.com/document_

通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于流计算,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。

前提条件

  • 已创建E-MapReduce Hadoop集群。具体操作,请参见创建集群

创建集群时,请确保打开挂载公网开关,将集群挂载到公网,用于Shell远程登录服务器。

说明:本文使用Shell命令演示,如果需要使用E-MapReduce的图形化页面进行数据开发。具体操作,请参见 数据开发

image.png

快速开始

1. 在表格存储侧创建数据表和通道

1.创建Source表和Sink表。具体操作,请参见概述。
Source表名称为OrderSource,主键列分别为UserId(用户ID)和OrderId(订单ID),属性列分别为price(价格)和timestamp(订单时间),数据示例如下图所示。

image.png

2.Sink表名称为OrderStreamSink,主键列分别为begin(开始时间)、end(结束时间),属性列分别为count(订单数)和totalPrice(订单总金额)。其中开始时间和结束时间的格式为“yyyy-MM-dd HH:mm:ss”,例如2019-11-27 14:54:00。

3.在Source表上创建通道。具体操作,请参见快速入门
通道列表中会显示该通道的通道名、通道ID、通道类型等信息。其中通道ID用于后续的流式处理。
image.png

2. 在EMR集群侧创建Spark外表

1.登录EMR Header服务器
2.执行如下命令启动spark-sql命令行,用于Spark外表创建和后续的SQL实战操作。

其中Spark的标准启动参数为--num-executors 32 --executor-memory 2g --executor-cores 2,可以根据具体的集群配置进行自定义调整。<Version>表示上传jar包的版本信息,请根据实际填写,例如2.1.0-SNAPSHOT。

```shell
spark-sql --jars emr-datasources_shaded_2.11-<Version>.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2

```

3.创建Source外表order_source(对应表格存储的OrderSource表)。

  • 参数

    参数 说明
    endpoint 表格存储实例访问地址,EMR集群中使用VPC地址。
    access.key.id 阿里云账号的AccessKey ID。
    access.key.secret 阿里云账号的AccessKey Secret。
    instance.name 表格存储实例访问地址,EMR集群中使用VPC地址。
    table.name 表格存储的数据表名称。
    catalog 数据表的Schema定义。
    • 示例

      DROP TABLE IF EXISTS order_source;
      CREATE TABLE order_source
      USING tablestore
      OPTIONS(
      endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
      access.key.id="",
      access.key.secret="",
      instance.name="vehicle-test",
      table.name="OrderSource",
      catalog='{"columns": {"UserId": {"type": "string"}, "OrderId": {"type": "string"},"price": {"type": "double"}, "timestamp": {"type": "long"}}}'
      );    

3. 实时流计算

实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回表格存储的数据表中。

1.创建流计算的Sink外表order_stream_sink(对应表格存储的OrderStreamSink表)。创建Sink外表与创建Source外表的参数设置中只有catalog字段有差别,其他参数设置均相同。

2.在Source外表order_source上创建视图。创建视图时需要设置Source表上通道的通道ID。

3.在视图上运行Stream SQL作业进行实时聚合,且将聚合结果实时写回表格存储的OrderStreamSink表。

//创建Sink外表order_stream_sink(对应表格存储的OrderStreamSink表)。
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"type": "string"},"end": {"type": "string"}, "count": {"type": "long"}, "totalPrice": {"type": "double"}}}'
);

//在order_source表上创建视图order_source_stream_view。
CREATE SCAN order_source_stream_view ON order_source USING STREAM
OPTIONS(
tunnel.id="4987845a-1321-4d36-9f4e-73d6db63bf0f", 
maxoffsetsperchannel="10000");

//在视图order_source_stream_view上运行Stream SQL作业,如下样例会按30s粒度进行订单数和订单金额的聚合。
//将聚合结果实时写回表格存储OrderStreamSink表。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");

运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果保存在OrderStreamSink表中。

image.png

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
SQL 分布式计算 Java
EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework
SparkSQL多年来的性能优化集中在Optimizer和Runtime两个领域。前者的目的是为了获得最优的执行计划,后者的目的是针对既定的计划尽可能执行的更快。
 EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework
|
SQL Java Apache
【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤
以测试集群版本为例(EMR-4.4.1)—— Flink SQL Client 集成 Hive 使用文档
【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤
|
SQL 存储 缓存
EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework
EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来最高4倍性能提升,为EMR再次获取世界第一立下汗马功劳。来自阿里云EMR团队的周克勇将详细介绍Native Codegen框架。
EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework
|
分布式计算 大数据 Spark
7月30日产品直播【EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework】
EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来最高4倍性能提升,为EMR再次获取世界第一立下汗马功劳,本次直播将详细介绍Native Codegen框架。
7月30日产品直播【EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework】
|
存储 分布式计算 算法
EMR Spark-SQL性能极致优化揭秘 RuntimeFilter Plus
在 2019 年的打榜测试中,我们基于 Spark SQL Catalyst Optimizer 开发的 RuntimeFilter 优化 对于 10TB 数据 99 query 的整体性能达到 35% 左右的提升。
EMR Spark-SQL性能极致优化揭秘 RuntimeFilter Plus
|
SQL 缓存 分布式计算
EMR Spark-SQL性能极致优化揭秘 概览篇
这次的优化里面,还有一个很好玩的优化,就是我们引入的 Native Runtime,如果说上述的优化器优化都是一些特殊 Case 的杀手锏,Native Runtime 就是一个广谱大杀器,根据我们后期统计,引入 Native Runtime,可以普适性的提高 SQL Query 15~20%的 E2E 耗时,这个在TPCDS Perf 里面也是一个很大的性能提升点。
EMR Spark-SQL性能极致优化揭秘 概览篇
|
SQL 缓存 分布式计算
EMR Spark-SQL性能极致优化揭秘 概览篇
引子 最近阿里云E-MapReduce团队在TPCDS-Perf榜单中提交了最新成绩,相比第二名(其实也是EMR团队于2019年提交的记录),无论从性能还有性价比都取得了2倍+的优秀成绩!详细看 TPCDS Perf 阿里云E-MapReduce团队,除了在产品、易用性、安全性等维度上投入了大量.
EMR Spark-SQL性能极致优化揭秘 概览篇
|
监控 NoSQL 流计算
海量监控日志基于EMR Spark Streaming SQL进行实时聚合
从EMR-3.21.0 版本开始将提供Spark Streaming SQL的预览版功能,支持使用SQL来开发流式分析作业。结果数据可以实时写入Tablestore。 本文以LogHub为数据源,收集ECS上的日志数据,通过Spark Streaming SQL进行聚合后,将流计算结果数据实时写入Tablestore,展示一个简单的日志监控场景。
6171 0
|
SQL 存储 分布式计算
使用EMR SQL 批处理Tablestore
通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于批计算,Tablestore on Spark提供索引选择、分区裁剪、Projection列和Filter下推、动态指定分区大小等功能,利用表格存储的全局二级索引或者多元索引可以加速查询。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.al
457 0
|
SQL 分布式计算 Spark
EMR上如何进行流式SQL调试
本文将简单介绍EMR提供的一个流式SQL调试工具。
2277 0
下一篇
开通oss服务