使用EMR SQL 批处理Tablestore

本文涉及的产品
表格存储 Tablestore,50G 2个月
简介: 通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于批计算,Tablestore on Spark提供索引选择、分区裁剪、Projection列和Filter下推、动态指定分区大小等功能,利用表格存储的全局二级索引或者多元索引可以加速查询。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.al

通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于批计算,Tablestore on Spark提供索引选择、分区裁剪、Projection列和Filter下推、动态指定分区大小等功能,利用表格存储的全局二级索引或者多元索引可以加速查询。

前提条件

  • 已创建E-MapReduce Hadoop集群。具体操作,请参见创建集群

    创建集群时,请确保打开挂载公网开关,将集群挂载到公网,用于Shell远程登录服务器。

    说明
    本文使用Shell命令演示,如果需要使用E-MapReduce的图形化页面进行数据开发。具体操作,请参见数据开发

    image.png

  • 已上传emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar包到EMR Header服务器。

Spark连接表格存储数据表和全局二级索引

Spark连接到表格存储数据表和全局二级索引后,通过Spark外表查询数据时,系统会根据查询条件中设置的列条件自动选择索引表进行查询。

1. 在表格存储侧创建数据表或全局二级索引

  1. 创建表格存储的数据表。具体操作,请参见概述

    本示例中数据表名称为tpch_lineitem_perf,主键列为l_orderkey(LONG类型)、l_linenumber(LONG类型),属性列分别为l_comment(STRING类型)、l_commitdate(STRING类型)、l_discount(DOUBLE类型)、l_extendedprice(DOUBLE类型)、l_linestatus(STRING类型)、l_partkey(LONG类型)、l_quantity(DOUBLE类型)、l_receiptdate(STRING类型)等14列,数据条数为384016850,数据样例如下图所示。
    image.png

  2. (可选)在数据表上创建全局二级索引。具体操作,请参见使用SDK

    当查询条件中需要使用数据表的非主键列,建议创建全局二级索引加速查询。

    全局二级索引支持在指定列上建立索引,生成的索引表中数据按指定的索引列进行排序,数据表的每一个数据写入都将自动以异步方式同步到索引表。
    image.png

2. 在EMR集群侧创建Spark外表

  1. 登录EMR Header服务器。
  2. 执行如下命令启动spark-sql命令行,用于Spark外表创建和后续的SQL实战操作。
    其中Spark的标准启动参数为--num-executors 32 --executor-memory 2g --executor-cores 2,可以根据具体的集群配置进行自定义调整。表示上传jar包的版本信息,请根据实际填写,例如2.1.0-SNAPSHOT。
    spark-sql --jars emr-datasources_shaded_2.11-<Version>.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
  3. 创建Spark外表同时连接全局二级索引。

    • 参数
    参数 说明
    endpoint 表格存储实例访问地址,EMR集群中使用VPC地址。
    access.key.id 阿里云账号的AccessKey ID。
    access.key.secret 阿里云账号的AccessKey Secret。
    instance.name 表格存储实例访问地址,EMR集群中使用VPC地址。
    table.name 表格存储的数据表名称。
    split.size.mbs 每个Split的大小,默认值为100 MB。
    max.split.count 数据表计算出的最大Split数,并发数和Spark的Split个数对应,默认值为1000。
    catalog 数据表的Schema定义。
    • 实例
      DROP TABLE IF EXISTS tpch_lineitem;
      CREATE TABLE tpch_lineitem
      USING tablestore
      OPTIONS(
      endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
      access.key.id="",
      access.key.secret="",
      instance.name="vehicle-test",
      table.name="tpch_lineitem_perf",
      split.size.mbs=10,
      max.split.count=1000,
      catalog='{"columns":{"l_orderkey":{"type":"long"},"l_partkey":{"type":"long"},"l_suppkey":{"type":"long"},"l_linenumber":{"type":"long"},"l_quantity":{"type":"double"},"l_extendedprice":{"type":"double"},"l_discount":{"type":"double"},"l_tax":{"type":"double"},"l_returnflag":{"type":"string"},"l_linestatus":{"type":"string"},"l_shipdate":{"type":"string"},"l_commitdate":{"type":"string"},"l_receiptdate":{"type":"string"},"l_shipinstruct":{"type":"string"},"l_shipmode":{"type":"string"},"l_comment":{"type":"string"}}}'
      );

3. SQL查询实战

如下是不同查询需求的SQL查询样例,请根据实际业务组合使用SQL查询。

  • 全表查询

    • SQL语句:SELECT COUNT(*) FROM tpch_lineitem;
    • SQL总耗时:36.199s、34.711s、34.801s,平均耗时35.237s。
  • 主键查询

    • SQL语句:SELECT COUNT(*) FROM tpch_lineitem WHERE l_orderkey = 1 AND l_linenumber = 1;
    • 表格存储服务端:GetRow操作,平均耗时为0.585 ms。
  • 非主键查询,未开启全局二级索引

    • SQL语句:SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';
    • SQL总耗时:37.006s、37.269s、37.17s,平均耗时37.149s。
  • 非主键查询,开启全局二级索引

    • SQL语句:SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';
    • SQL总耗时(开启l_shipdate列的全局二级索引):1.686s、1.651s、1.784s,平均耗时1.707s。

Spark连接表格存储数据表和多元索引

Spark连接到表格存储数据表和多元索引后,通过Spark外表查询数据时,系统会自动使用设置的多元索引进行查询。

1. 在表格存储侧创建数据表和多元索引

  1. 创建数据表。具体操作,请参见概述
    本示例中数据表名称为geo_table,主键列为pk1(String类型),属性列分别为val_keyword1(String类型)、val_keyword2(String类型)、val_keyword3(String类型)、val_bool(Boolean类型)、val_double(Double类型)、val_long1(Long类型)、val_long2(Long类型)、val_text(String类型)和val_geo(String类型),数据条数为208912382,数据样例如下图所示。
    image.png

  2. 在数据表上创建多元索引。具体操作,请参见创建及使用多元索引
    创建多元索引时,根据字段类型选择对应的多元索引Mapping。

    说明:创建多元索引时,地理位置字段需选择字段类型为地理位置而非字符串类型。

    image.png
    创建多元索引后,多元索引会自动开始同步数据表中的数据,待多元索引进入增量状态时,表示多元索引完成构建。
    image.png

在EMR集群侧创建Spark外表

  1. 登录EMR Header服务器
  2. 创建Spark外表同时连接多元索引

    • 参数
    参数 说明
    endpoint 表格存储实例访问地址,EMR集群中使用VPC地址。
    access.key.id 阿里云账号的AccessKey ID。
    access.key.secret 阿里云账号的AccessKey Secret。
    instance.name 表格存储实例访问地址,EMR集群中使用VPC地址。
    table.name 表格存储的数据表名称。
    split.size.mbs 每个Split的大小,默认值为100 MB。
    max.split.count 数据表计算出的最大Split数,并发数和Spark的Split个数对应,默认值为1000。
    push.down.range.long 与Long类型的列做大小(>=、>、<、<=)比较的谓词是否下推。更多信息,请参见批计算谓词下推配置。类型为Boolean,默认值为true,表示与Long类型的列做大小比较的谓词下推。设置为false时,表示与Long类型的列做大小比较的谓词不下推。
    push.down.range.string 与String类型的列做大小(>=、>、<、<=)比较的谓词是否下推。更多信息,请参见批计算谓词下推配置。类型为Boolean,默认值为true,表示与String类型的列做大小比较的谓词下推。设置为false时,表示与String类型的列做大小比较的谓词不下推。
    • 示例
      DROP TABLE IF EXISTS geo_table;
      CREATE TABLE geo_table (
      pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING,
      val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
      val_text STRING, val_geo STRING COMMENT "geo stored in string format"
      )
      USING tablestore
      OPTIONS(
      endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
      access.key.id="",
      access.key.secret="",
      instance.name="sparksearchtest",
      table.name="geo_table",
      search.index.name="geo_table_index",
      max.split.count=64,
      push.down.range.long = false,
      push.down.range.string = false
      );

3. SQL查询实战

如下是不同查询需求的SQL查询样例,请根据实际业务组合使用SQL查询。

  • 使用多元索引全表查询

    • SQL语句:SELECT COUNT(*) FROM geo_table;
    • SQL耗时:测试数据208912382条,配置64个Parallel Scan并发,实际耗时165.208s,平均QPS约126.45万。
      208912382
      Time taken: 165.208 seconds, Fetched 1 row(s)
      20/06/29 20:55:11 INFO [main] SparkSQLCLIDriver: Time taken: 165.208 seconds, Fetched 1 row(s)
  • 组合条件查询

    • SQL语句:SELECT val_long1, val_long2, val_keyword1, val_double FROM geo_table WHERE (val_long1 > 17183057 AND val_long1 < 27183057) AND (val_long2 > 1000 AND val_long2 < 5000) LIMIT 100;
    • SQL耗时:Spark会将Projection列和Filter下推到多元索引,实际耗时2.728s,极大加快查询效率。
      21423964        4017    aaa     2501.9901650365096
      21962236        2322    eio     2775.9021545044116
      Time taken: 2.894 seconds, Fetched 100 row(s)
      20/06/30 18:51:24 INFO [main] SparkSQLCLIDriver: Time taken: 2.894 second
  • 地理位置查询

地理位置查询包括地理距离查询、地理长方形查询和地理多边形范围查询三种地理位置查询方式。示例中val_geo为地理位置字段名,地理坐标的格式都为"纬度,经度"。

  • 地理距离查询
    语法为val_geo = '{"centerPoint":"中心点坐标", "distanceInMeter": 距离中心点的距离}'。
    SQL语句:
    SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"centerPoint":"6.530045901643962,9.05358919674954", "distanceInMeter": 3000.0}';

    • 地理长方形查询
      语法为val_geo = '{"topLeft":"矩形框的左上角的坐标", "bottomRight": "矩形框的右下角的坐标"}'。
      SQL语句:
      SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"topLeft":"6.257664116603074,9.1595116589601", "bottomRight": "6.153593333442616,9.25968497923747"}';
  • 地理多边形范围查询
    语法为val_geo = '{"points":["坐标1", "坐标2", .... "坐标n-1", "坐标n"]}'。
    SQL语句:
    SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"points":["6.530045901643962,9.05358919674954", "6.257664116603074,9.1595116589601", "6.160393397574926,9.256517839929597", "6.16043846779313,9.257192872563525"]}';

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
7月前
|
SQL Oracle Java
sql文件批处理程序-java桌面应用
sql文件批处理程序-java桌面应用
62 0
|
2月前
|
SQL 数据库
执行 Transact-SQL 语句或批处理时发生了异常。 (Microsoft.SqlServer.ConnectionInfo)之解决方案
执行 Transact-SQL 语句或批处理时发生了异常。 (Microsoft.SqlServer.ConnectionInfo)之解决方案
268 0
|
SQL 关系型数据库 Linux
Postgres SQL 做备份脚本批处理
每次手动备份太麻烦了,工作上需要,决定使用自动备份,所以写个博客来记录一次,本次备份功能是无密码通过批处理来执行定时备份的,如果是windows server r2服务器的话大家可以搭配任务计划程序来做定时执行,如果是linux内核的系统可以用crontab插件,crontab 插件大家可以自行百度,从而形成定时备份数据。
210 0
|
SQL Java 数据库连接
Java JDBC中的批处理SQL语句的详解
Java JDBC中的批处理SQL语句的详解
156 0
|
SQL 存储 自然语言处理
表格存储最佳实践:使用多元索引加速 SQL 查询
表格存储(Tablestore)在 2022 年 5 月正式发布了 SQL 商业化版本,业务上只需要在数据表上建立映射关系,就可以基于 SQL 引擎方便地对表格存储中的数据进行访问和计算,大大地降低了用户的学习成本。
726 0
|
存储 SQL NoSQL
表格存储 Tablestore SQL 商业版介绍
表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。 表格存储在 21 年 9 月正式公测了 SQL 功能,使得你在享受表格存储全托管,灵活的存储能力之外,可以让你的业务迁移更加平顺。经
1215 0
表格存储 Tablestore SQL 商业版介绍
|
SQL 存储 Java
表格存储 SQL 查询多元索引
多元索引是表格存储产品中一个重要的功能,多元索引使用倒排索引技术为表格存储提供了非主键列上的快速检索功能,另外也提供了统计聚合功能。表格存储近期开放了SQL查询功能,SQL引擎默认从原始表格中读取数据,非主键列上的查询需要扫描全表。
表格存储 SQL 查询多元索引
|
存储 SQL NoSQL
表格存储 SQL 功能快速上手
# 功能介绍 表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。​ 表格存储正式发布了 SQL 功能,满足用户业务平滑迁移到表格存储并可以继续通过 SQL 方式访问表格存储,表格存储
1587 0
|
SQL 存储 Cloud Native
表格存储 SQL 操作实战
表格存储做为一款结构化存储系统,近期发布了新功能 SQL,大幅简化了查询的门槛,用户无需学习繁琐的 SDK,也不用区分表,索引等不同的接口,可以像访问传统的 MySQL 这类数据库一样,使用 SQL 的方式访问云原生的结构化大数据存储。下面我们就来具体实操下,看看查询用起来顺不顺手。
617 0
|
SQL Java Apache
【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤
以测试集群版本为例(EMR-4.4.1)—— Flink SQL Client 集成 Hive 使用文档
【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤