Spark访问多元索引-细则剖析

本文涉及的产品
表格存储 Tablestore,50G 2个月
简介: ## 背景 表格存储可以为Spark提供**KV查询(主表,全局二级索引表)**、**多元索引查询**两套数据访问方式,以支持海量结构化数据快速读写和丰富的SQL查询分析能力。其分布式存储的特点和强大的索引引擎能够支持PB级存储、千万TPS以及毫秒级延迟的服务能力。 KV访问方式指的是主表和全局二级索引访问方式,其中主表指的是Tablestore的源数据主表,全局二级索引和多元索引的介绍见

背景

表格存储可以为Spark提供KV查询(主表,全局二级索引表)多元索引查询两套数据访问方式,以支持海量结构化数据快速读写和丰富的SQL查询分析能力。其分布式存储的特点和强大的索引引擎能够支持PB级存储、千万TPS以及毫秒级延迟的服务能力。

KV访问方式指的是主表和全局二级索引访问方式,其中主表指的是Tablestore的源数据主表,全局二级索引和多元索引的介绍见文章海量结构化数据存储技术揭秘:Tablestore存储和索引引擎详解

Spark多元索引访问与KV访问方式的区别:
KV查询方式在过滤字段是主键的场景下效率较高,但不适合过滤字段变动零活且过滤字段中非主键列较多的场景。多元索引是Tablestore基于倒排索引组织的数据,可以提供类似ES的丰富查询、统计聚合、分词查询、地理位置查询功能。在下述数据访问场景中,推荐使用多元索引来代替KV访问方式:

  • 少量且对延时要求较高的实时数据分析场景
  • SQL语句中非主键的过滤字段较多,多数字段无法被二级索引主键或者主表主键包含
  • SQL语句中过滤字段的筛选效率较高即可以通过某一字段条件过滤掉大部分数据,比如select * from table where col = 1000中,col是非主键列,且 a = 1000 这个字段条件可以过滤掉大部分数据
  • 查询条件中包含三种特殊的地理坐标查询方式,KV访问方式不支持地理坐标查询

接下来结合下图以select * from table where col1 like '阿%' or col2 = 'a'为例解释多元索引的优势。通过多元索引访问数据时,在多元索引表col1中我们可以拿到col1为a的两行数据:pk1 = 1和pk1 = 2;在索引表col2中,我们可以拿到col2的值为‘阿%’的一行数据:pk1 = 1;进而将两次中间结果的数据进行union,即可得到我们想要的数据:pk1 = 1,col1 = ‘阿里云’,col2 = ‘a'

KV查询方式中,查询主体是Tablestore的主表,主表只在主键上拥有索引能力。如果要查询的SQL语句中的过滤字段不是主表的主键,则需要进行全表扫描。以select * from table where col1 = 'a' or col2 like '阿%'为例,由于col1不是主表的主键,Tablestore会全表扫描找到col1为a的两行数据;由于col2不是主表的主键,会再一次全表扫描找到col2的值为‘阿%’的一行数据,进而做数据的union。当然,我们可以通过构建一个主键为col1,col2的二级索引表来支持该查询,但这种方式的灵活性明显不如多元索引高。

1.png

Spark对接多元索引实操见文章,下文将继续介绍数据类型支持、谓词支持和自定义谓词下推配置。

数据类型支持

基础数据类型

Spark数据类型 Scala中的值类型 多元索引类型 Tablestore内部类型
ByteType Byte Long INTEGER
ShortType Short Long INTEGER
IntegerType Int Long INTEGER
LongType Long Long INTEGER
FloatType Float Double DOUBLE
DoubleType Double Double DOUBLE
StringType String Keyword/Text STRING
BinaryType Array[Byte] Binary BINARY
BooleanType Boolean Boolean BOOLEAN
地理坐标(String Json) String(Json) GEO_POINT STRING(Json)

复杂数据类型-地理类型(Geo类型)

地理坐标查询是多元索引支持的查询方式,我们将其提供到计算层,使得Spark不仅可以查询分析基础类型数据,也可以结合地理位置对数据进行查询分析。
地理位置查询支持三种类型查询:半径圆查询、矩形查询、多边形查询。

Spark数据类型 Scala中的值类型 多元索引类型 Tablestore内部类型
地理坐标(String Json)半径圆 String(Json) GEO_POINT STRING(Json)
地理坐标(String Json)矩形 String(Json) GEO_POINT STRING(Json)
地理坐标(String Json)多边形 String(Json) GEO_POINT STRING(Json)

应用场景:
地理坐标查询应用场景十分广泛,物联网设备位置信息、骑手订单、打卡位置信息、快递地理信息等应用场景下皆有用户在使用。

使用方式:

//地理半径圆查询
select * from table where  val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' and name like 'ali%'
//地理矩形查询
select * from table where geo = '{"topLeft":"8,0", "bottomRight": "0,10"}' and id in { 123 , 321 }
//地理多边形查询
select * from table where geo = '{"points":["5,0", "5,1", "6,1", "6,10"]}'

谓词支持

谓词支持列表

Spark 是否支持 SQL举例
And select * from table where a > 1 and b < 0 
Or select * from table where a > 1 or b < 0
Not select * from table where a != 1
EqualTo select * from table where a = 1
Not+EqualTo select * from table where a != 1
IsNull select * from table where a is null table表中没有a列的行
In select * from table where a in {1,2,3} 默认最大限制1024
LessThan select * from table where a < 10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明"
LessThanOrEqual select * from table where a <=10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明"
GreaterThan select * from table where a > 10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明"
GreaterThanOrEqual select * from table where a >= 10 如果SQL语句中使用该谓词的列的类型为Long或者String,则可以通过谓词下推配置设置是否下推,详情见下一节"谓词下推支持说明"
StringStartsWith select * from table where a like "tablestore%" table表中a列以tablestore为prefix的行全部返回
地理坐标(String Json)中心距离 select * from table where val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' 圆心半径决定的地理圆圈
地理坐标(String Json)矩形框 select * from table where geo = '{"topLeft":"8,0", "bottomRight": "0,10"}' 左上角,右下角决定的地理矩形
地理坐标(String Json)多边形框 select * from table where geo = '{"points":["5,0", "5,1", "6,1", "6,10"]}' 多个点组成的地理多边形

自定义谓词下推

自定义谓词下推目前支持与Long,String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)是否下推。
谓词下推配置参数列表:

参数名称 默认值 可选值 说明
push.down.range.long true true,false 为false时表示与Long类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)不下推
push.down.range.string true true,false 为false时表示与String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)不下推

适用场景:
多元索引中,如果多字段过滤的中间结果数据量较大,则中间结果的union较为耗时。将某些字段的过滤从存储层提到计算层进行来做可以提高效率。比如select * from table where a = 10 and b < 999999999, 假设 a = 10 返回的结果只有1000条,b < 999999999的结果有一亿条 ,果在存储层这1000条结果与一亿条结果作union是比较耗时的,但如果把b < 999999999提到计算层,则Spark只需要对存储层返回的1000条数据作过滤,存储层的压力大大降低。
使用方式:
push.down.range.string与push.down.range.long在建立Spark外表的时候配置,默认为true,如果选择false则为不下推。

 创建Spark外表同时连接多元索引,各个参数的说明如下:
• endpoint: 表格存储实例访问地址,EMR集群里使用VPC地址。
• access.key.id: 阿里云账号AK ID。
• access.key.secret: 阿里云账号AK Secret。
• instance.name: 实例名。
• table.name: Tablestore表名。
• search.index.name: 多元索引名。
• max.split.count: 多元索引Parallel Scan的查询并发度,并发数和Spark的Split数对应。
• push.down.range.long: 与Long类型做Range(  >=  >   <   <=  )比较的谓词是否下推
• push.down.range.string: 与String类型做Range(  >=  >   <   <=  )比较的谓词是否下推

DROP TABLE IF EXISTS geo_table;
CREATE TABLE geo_table (
pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING, 
val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
val_text STRING, val_geo STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="sparksearchtest",
table.name="geo_table",
search.index.name="geo_table_index",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

下推规则详解:
谓词下推的配置在创建Spark外表的时候设置,默认全部下推,可设置push.down.range.long=false/push.down.range.string=false使得与Long,String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)不下推。
当SQL语句中的逻辑谓词只有AND和NOT(即不可以存在逻辑谓词OR),则可按照如下规则选择谓词下推

  • 配置push.down.range.long=false:不下推与Long类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual),如下表例2
  • push.down.range.string:不下推与String类型的列做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual),如下表例3

当SQL语句中的逻辑谓词存在OR时,push.down.range.long、string=false不会生效,所有谓词全部下推

SQL举例:

举例ID 逻辑谓词 下推配置 SQL举例 预期效果
1 全为AND push.down.range.long=true
push.down.range.string=true
select * from table where val_long1 > 1000 and val_long1 is null and name like 'table%' and pk in {12341,213432} SQL正常
2 全为AND push.down.range.long=false select * from table where val_long1 > 1 and name like 'table%' 与Long类型做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)都不会被下推,该谓词交由Spark层来做过滤 spark层获取到的是name like 'table%'的数据,val_long1 > 1交由spark过滤
3 全为AND push.down.range.string=false select * from table where val_string1 > 'string1' and name like 'table%' 与String类型做大小比较的谓词(LessThan LessThanOrEqual GreaterThan GreaterThanOrEqual)都不会被下推,该谓词交由Spark层来做过滤 spark层获取到的是name like 'table%'的数据,val_string1 > 'string1'交由spark过滤
4 全为AND 存在地理列 select * from table where val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' and val_long1 = 37691900 and val_long2 > 2134234 SQL正常
val_long2 > 2134234能否被过滤则取决于push.down.range.long的配置
5 存在OR long,string都配置为可以下推
push.down.range.long=true
push.down.range.string=true
select * from table where val_long1 > 1000 or val_long1 is null or name like 'table%' and pk in {12341,213432} SQL正常
6 存在OR 存在地理列 select * from table where val_geo = '{"centerPoint":"3,0", "distanceInMeter": 100000}' or val_long1 = 37691900 SQL正常
7 存在OR push.down.range.long=false
且SQL语句中存在rangeLong 的过滤字段
select * from table where val_long1 > 1 and name like 'table%' 此时push.down.range.long=false不生效,SQL谓词全部下推
8 存在OR push.down.range.string=false
且SQL语句中存在rangeString 的过滤字段
select * from table where val_string1 > 'string1' and name like 'table%' 此时push.down.range.string=false不生效,SQL谓词全部下推
相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
SQL 分布式计算 数据安全/隐私保护
如何杜绝 spark history server ui 的未授权访问? 1
如何杜绝 spark history server ui 的未授权访问?
|
5月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 DataWorks API
DataWorks产品使用合集之在DataWorks中,通过spark访问外网的步骤如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
151 0
|
6月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
140 2
|
6月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
分布式计算 Hadoop 大数据
如何杜绝 spark history server ui 的未授权访问? 2
如何杜绝 spark history server ui 的未授权访问?
|
分布式计算 Hadoop Java
Hadoop/Spark 访问 OSS 加速 | 学习笔记
快速学习Hadoop/Spark 访问 OSS 加速。
465 0
|
存储 分布式计算 大数据
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
527 0
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
|
分布式计算 Java Hadoop
Spark集群搭建记录 | 云计算[CentOS8] | Scala Maven项目访问Spark(local模式)实现单词计数(下)
step6 创建scala object step7 修改pom文件 step8 配置项目 step9 添加依赖库(Spark的jar包) step10 设置输入路径
164 0
Spark集群搭建记录 | 云计算[CentOS8] | Scala Maven项目访问Spark(local模式)实现单词计数(下)
|
分布式计算 IDE Java
Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数(上)
写在前面 step1 下载Scala IDE step2 解压安装Scala IDE step3 Scala 下载 step4 Scala 配置 step5 创建scala项目
156 0
Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数(上)