本文介绍如何在E-MapReduce集群中使用Spark SQL访问表格存储的多元索引。
前置准备
- 创建E-MapReduce Hadoop集群,步骤参见E-MapReduce快速入门。创建时挂载公网地址,用于后续的Shell远程登陆。
步骤一 创建Tablestore表和多元索引
- 创建Tablestore源表
详细开通步骤请参考官方文档,本文demo中所创建出来的表名为geo_table, 主键列为pk1(STRING),属性列分别为val_keyword1(STRING), val_keyword2(STRING), val_keyword3(STRING),val_bool(BOOLEAN), val_double (DOUBLE), val_long1(LONG), val_long2(LONG),val_text(STRING), val_geo(STRING),数据条数为208912382,数据样例如下图所示。
- 在源表上开通多元索引,根据字段类型选择对应的索引Mapping,创建完成后,多元索引会自动开始同步,等待索引进入增量状态时,索引完成构建。
注意:地理位置字段建索引的时候,选择地理位置类型而非字符串类型.
步骤二 创建Spark外表
- 上传表格存储开发提供的emr-tablestore_shaded_2.11-2.1.0-SNAPSHOT.jar包到EMR Header机器上。
注:Spark访问多元索引的jar包为预览版,尚未正式发布,后续可以通过maven方式引入。
- 使用以下命令,启动spark-sql命令行,用于外表创建和后续的SQL实战。其中Spark的标准启动参数
--num-executors 64 --executor-memory 2g --executor-cores 1
,可以根据具体的集群配置进行自定义调整。
spark-sql --driver-class-path emr-tablestore_shaded_2.11-2.1.0-SNAPSHOT.jar --jars emr-tablestore_shaded_2.11-2.1.0-SNAPSHOT.jar --master yarn --num-executors 64 --executor-memory 2g --executor-cores 1
创建Spark外表同时连接多元索引,各个参数的说明如下:
- endpoint: 表格存储实例访问地址,EMR集群里使用VPC地址。
- access.key.id: 阿里云账号AK ID。
- access.key.secret: 阿里云账号AK Secret。
- instance.name: 实例名。
- table.name: Tablestore表名。
- search.index.name: 多元索引名。
- max.split.count: 多元索引Parallel Scan的查询并发度,并发数和Spark的Split数对应。
- push.down.range.long: 与Long类型做Range( >= > < <= )比较的谓词是否下推
- push.down.range.string: 与String类型做Range( >= > < <= )比较的谓词是否下推
DROP TABLE IF EXISTS geo_table;
CREATE TABLE geo_table (
pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING,
val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
val_text STRING, val_geo STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="sparksearchtest",
table.name="geo_table",
search.index.name="geo_table_index",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);
步骤三 SQL查询实战
以下仅演示一些SQL查询样例,具体的SQL可以根据业务进行灵活的组合。
全表索引查询
全索引表查询:SELECT COUNT(*) FROM geo_table;
- 测试数据208912382条,配置64个Parallel Scan并发,实际耗时165.208s,平均QPS约126.45w。
208912382
Time taken: 165.208 seconds, Fetched 1 row(s)
20/06/29 20:55:11 INFO [main] SparkSQLCLIDriver: Time taken: 165.208 seconds, Fetched 1 row(s)
组合条件查询
- 测试SQL:SELECT val_long1, val_long2, val_keyword1, val_double FROM geo_table WHERE (val_long1 > 17183057 AND val_long1 < 27183057) AND (val_long2 > 1000 AND val_long2 < 5000) LIMIT 100;
- Spark会将Projection列和Filter下推到多元索引,实际耗时2.728s,极大加快查询效率。
21423964 4017 aaa 2501.9901650365096
21962236 2322 eio 2775.9021545044116
Time taken: 2.894 seconds, Fetched 100 row(s)
20/06/30 18:51:24 INFO [main] SparkSQLCLIDriver: Time taken: 2.894 seconds, Fetched 100 row(s)
GEO条件查询
GEO支持三种查询,距离查询,距离长方形查询和距离多边形查询,GEO查询也会被下推到多元索引层,示例如下, 其中val_geo为GEO字段名,坐标的格式都为"纬度,经度"。
地理距离查询, 语法为val_geo = '{"centerPoint":"中心点坐标", "distanceInMeter": 距离中心点的距离}'
- SELECT COUNT(*) FROM search_view WHERE val_geo = '{"centerPoint":"6.530045901643962,9.05358919674954", "distanceInMeter": 3000.0}';
地理长方形查询: 语法为val_geo = '{"topLeft":"矩形框的左上角的坐标", "bottomRight": "矩形框的右下角的坐标"}'。
- SELECT COUNT(*) FROM search_view WHERE val_geo = '{"topLeft":"6.257664116603074,9.1595116589601", "bottomRight": "6.153593333442616,9.25968497923747"}';
地理多边形范围查询: 语法为val_geo = '{"points":["坐标1", "坐标2", .... "坐标n-1", "坐标n"]}'。
- SELECT COUNT(*) FROM search_view WHERE val_geo = '{"points":["6.530045901643962,9.05358919674954", "6.257664116603074,9.1595116589601", "6.160393397574926,9.256517839929597", "6.16043846779313,9.257192872563525"]}'