背景
表格存储可以为Spark提供KV查询(主表,全局二级索引表)、多元索引查询两套数据访问方式,以支持海量结构化数据快速读写和丰富的SQL查询分析能力。其分布式存储的特点和强大的索引引擎能够支持PB级存储、千万TPS以及毫秒级延迟的服务能力。
KV访问方式指的是主表和全局二级索引访问方式,其中主表指的是Tablestore的源数据主表,全局二级索引和多元索引的介绍见文章海量结构化数据存储技术揭秘:Tablestore存储和索引引擎详解。
Spark多元索引访问与KV访问方式的区别:
KV查询方式在过滤字段是主键的场景下效率较高,但不适合过滤字段变动零活且过滤字段中非主键列较多的场景。多元索引是Tablestore基于倒排索引组织的数据,可以提供类似ES的丰富查询、统计聚合、分词查询、地理位置查询功能。在下述数据访问场景中,推荐使用多元索引来代替KV访问方式:
- 少量且对延时要求较高的实时数据分析场景
- SQL语句中非主键的过滤字段较多,多数字段无法被二级索引主键或者主表主键包含
- SQL语句中过滤字段的筛选效率较高即可以通过某一字段条件过滤掉大部分数据,比如select * from table where col = 1000中,col是非主键列,且 a = 1000 这个字段条件可以过滤掉大部分数据
- 查询条件中包含三种特殊的地理坐标查询方式,KV访问方式不支持地理坐标查询
接下来结合下图以select * from table where col1 like '阿%' or col2 = 'a'
为例解释多元索引的优势。通过多元索引访问数据时,在多元索引表col1中我们可以拿到col1为a的两行数据:pk1 = 1和pk1 = 2;在索引表col2中,我们可以拿到col2的值为‘阿%’的一行数据:pk1 = 1;进而将两次中间结果的数据进行union,即可得到我们想要的数据:pk1 = 1,col1 = ‘阿里云’,col2 = ‘a'
KV查询方式中,查询主体是Tablestore的主表,主表只在主键上拥有索引能力。如果要查询的SQL语句中的过滤字段不是主表的主键,则需要进行全表扫描。以select * from table where col1 = 'a' or col2 like '阿%'
为例,由于col1不是主表的主键,Tablestore会全表扫描找到col1为a的两行数据;由于col2不是主表的主键,会再一次全表扫描找到col2的值为‘阿%’的一行数据,进而做数据的union。当然,我们可以通过构建一个主键为col1,col2的二级索引表来支持该查询,但这种方式的灵活性明显不如多元索引高。
Spark对接多元索引实操见文章,下文将继续介绍数据类型支持、谓词支持和自定义谓词下推配置。
数据类型支持
基础数据类型
Spark数据类型 | Scala中的值类型 | 多元索引类型 | Tablestore内部类型 |
---|---|---|---|
ByteType | Byte | Long | INTEGER |
ShortType | Short | Long | INTEGER |
IntegerType | Int | Long | INTEGER |
LongType | Long | Long | INTEGER |
FloatType | Float | Double | DOUBLE |
DoubleType | Double | Double | DOUBLE |
StringType | String | Keyword/Text | STRING |
BinaryType | Array[Byte] | Binary | BINARY |
BooleanType | Boolean | Boolean | BOOLEAN |
地理坐标(String Json) | String(Json) | GEO_POINT | STRING(Json) |
复杂数据类型-地理类型(Geo类型)
地理坐标查询是多元索引支持的查询方式,我们将其提供到计算层,使得Spark不仅可以查询分析基础类型数据,也可以结合地理位置对数据进行查询分析。
地理位置查询支持三种类型查询:半径圆查询、矩形查询、多边形查询。
Spark数据类型 | Scala中的值类型 | 多元索引类型 | Tablestore内部类型 |
---|---|---|---|
地理坐标(String Json)半径圆 | String(Json) | GEO_POINT | STRING(Json) |
地理坐标(String Json)矩形 | String(Json) | GEO_POINT | STRING(Json) |
地理坐标(String Json)多边形 | String(Json) | GEO_POINT | STRING(Json) |
应用场景:
地理坐标查询应用场景十分广泛,物联网设备位置信息、骑手订单、打卡位置信息、快递地理信息等应用场景下皆有用户在使用。
使用方式:
- 多元索引原生查询方式见链接:多元索引地理类型
- SparkSQL查询方式举例:
//地理半径圆查询
select * from table where val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' and name like 'ali%'
//地理矩形查询
select * from table where geo = '{"topLeft":"8,0", "bottomRight": "0,10"}' and id in { 123 , 321 }
//地理多边形查询
select * from table where geo = '{"points":["5,0", "5,1", "6,1", "6,10"]}'
谓词支持
谓词支持列表
Spark | 是否支持 | SQL举例 |
---|---|---|
And | 是 | select * from table where a > 1 and b < 0 |
Or | 是 | select * from table where a > 1 or b < 0 |
Not | 是 | select * from table where a != 1 |
EqualTo | 是 | select * from table where a = 1 |
Not+EqualTo | 是 | select * from table where a != 1 |
IsNull | 是 | select * from table where a is null table表中没有a列的行 |
In | 是 | select * from table where a in {1,2,3} 默认最大限制1024 |
LessThan | 是 | select * from table where a < 10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明" |
LessThanOrEqual | 是 | select * from table where a <=10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明" |
GreaterThan | 是 | select * from table where a > 10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明" |
GreaterThanOrEqual | 是 | select * from table where a >= 10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明" |
StringStartsWith | 是 | select * from table where a like "tablestore%" table表中a列以tablestore为prefix的行全部返回 |
地理坐标(String Json)中心距离 | 是 | select * from table where val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' 圆心半径决定的地理圆圈 |
地理坐标(String Json)矩形框 | 是 | select * from table where geo = '{"topLeft":"8,0", "bottomRight": "0,10"}' 左上角,右下角决定的地理矩形 |
地理坐标(String Json)多边形框 | 是 | select * from table where geo = '{"points":["5,0", "5,1", "6,1", "6,10"]}' 多个点组成的地理多边形 |
自定义谓词下推
自定义谓词下推目前支持与Long,String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)是否下推。
谓词下推配置参数列表:
参数名称 | 默认值 | 可选值 | 说明 |
---|---|---|---|
push.down.range.long | true | true,false | 为false时表示与Long类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)不下推 |
push.down.range.string | true | true,false | 为false时表示与String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)不下推 |
适用场景:
多元索引中,如果多字段过滤的中间结果数据量较大,则中间结果的union较为耗时。将某些字段的过滤从存储层提到计算层进行来做可以提高效率。比如select * from table where a = 10 and b < 999999999, 假设 a = 10 返回的结果只有1000条,b < 999999999的结果有一亿条 ,果在存储层这1000条结果与一亿条结果作union是比较耗时的,但如果把b < 999999999提到计算层,则Spark只需要对存储层返回的1000条数据作过滤,存储层的压力大大降低。
使用方式:
push.down.range.string与push.down.range.long在建立Spark外表的时候配置,默认为true,如果选择false则为不下推。
创建Spark外表同时连接多元索引,各个参数的说明如下:
• endpoint: 表格存储实例访问地址,EMR集群里使用VPC地址。
• access.key.id: 阿里云账号AK ID。
• access.key.secret: 阿里云账号AK Secret。
• instance.name: 实例名。
• table.name: Tablestore表名。
• search.index.name: 多元索引名。
• max.split.count: 多元索引Parallel Scan的查询并发度,并发数和Spark的Split数对应。
• push.down.range.long: 与Long类型做Range( >= > < <= )比较的谓词是否下推
• push.down.range.string: 与String类型做Range( >= > < <= )比较的谓词是否下推
DROP TABLE IF EXISTS geo_table;
CREATE TABLE geo_table (
pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING,
val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
val_text STRING, val_geo STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="sparksearchtest",
table.name="geo_table",
search.index.name="geo_table_index",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);
下推规则详解:
谓词下推的配置在创建Spark外表的时候设置,默认全部下推,可设置push.down.range.long=false/push.down.range.string=false使得与Long,String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)不下推。
当SQL语句中的逻辑谓词只有AND和NOT(即不可以存在逻辑谓词OR),则可按照如下规则选择谓词下推
- 配置push.down.range.long=false:不下推与Long类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual),如下表例2
- push.down.range.string:不下推与String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual),如下表例3
当SQL语句中的逻辑谓词存在OR时,push.down.range.long、string=false不会生效,所有谓词全部下推
SQL举例:
举例ID | 逻辑谓词 | 下推配置 | SQL举例 | 预期效果 |
---|---|---|---|---|
1 | 全为AND | push.down.range.long=true push.down.range.string=true |
select * from table where val_long1 > 1000 and val_long1 is null and name like 'table%' and pk in {12341,213432} | SQL正常 |
2 | 全为AND | push.down.range.long=false | select * from table where val_long1 > 1 and name like 'table%' | 与Long类型做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)都不会被下推,该谓词交由Spark层来做过滤 spark层获取到的是name like 'table%'的数据,val_long1 > 1交由spark过滤 |
3 | 全为AND | push.down.range.string=false | select * from table where val_string1 > 'string1' and name like 'table%' | 与String类型做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)都不会被下推,该谓词交由Spark层来做过滤 spark层获取到的是name like 'table%'的数据,val_string1 > 'string1'交由spark过滤 |
4 | 全为AND | 存在地理列 | select * from table where val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' and val_long1 = 37691900 and val_long2 > 2134234 | SQL正常 |
val_long2 > 2134234能否被过滤则取决于push.down.range.long的配置 | ||||
5 | 存在OR | long,string都配置为可以下推 push.down.range.long=true push.down.range.string=true |
select * from table where val_long1 > 1000 or val_long1 is null or name like 'table%' and pk in {12341,213432} | SQL正常 |
6 | 存在OR | 存在地理列 | select * from table where val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' or val_long1 = 37691900 | SQL正常 |
7 | 存在OR | push.down.range.long=false 且SQL语句中存在rangeLong 的过滤字段 |
select * from table where val_long1 > 1 and name like 'table%' | 此时push.down.range.long=false不生效,SQL谓词全部下推 |
8 | 存在OR | push.down.range.string=false 且SQL语句中存在rangeString 的过滤字段 |
select * from table where val_string1 > 'string1' and name like 'table%' | 此时push.down.range.string=false不生效,SQL谓词全部下推 |