SparkSQL DatasourceV2 之 Multiple Catalog

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: SparkSQL DatasourceV2作为Spark2.3引入的特性,在Spark 3.0 preview(2019/12/23)版本中又有了新的改进以更好的支持各类数据源。本文将从catalog角度,介绍新的数据源如何和Spark DatasourceV2进行集成。
+关注继续查看

原文链接
作者:马骏杰


问题

SparkSQL是Spark的一个子模块,主要功能是用于处理结构化数据,目前在大数据OLAP领域已经有了广泛的应用。Iceberg作为一个通用的表格式,也已经在数据湖的解决方案中逐渐展现了它的优势。

那该如何将这2者相结合,使得应用SparkSQL + Iceberg可以和SparkSQL + Hive一样方便,如,基于SQL直接访问数据或进行DDL操作:

select c1 from iceberg_db.t;
drop table iceberg_db.t;

SparkSQL 基本原理

先来看下SparkSQL处理SQL的基本流程:

image.png

如上图所示,在提交SQL后,spark内部会经历语法解析生成逻辑计划,解析逻辑计划,优化逻辑计划,生成执行计划,执行。在解析逻辑计划的过程中,引入了catalog,它的作用是来判断SQL中引用的数据库,表,列,函数等是否存在。

在Spark + Hive的解决方案中,基于ExternalCatalog接口,实现了HiveExternalCatalog,该类中的Hive客户端和Hive的metastore进行交互,从而能解析SQL中的库表列是否存在,并能基于Hive客户端进行Hive表的DDL操作,比如create table, drop table等。

Multiple Catalog解析

那Spark + Iceberg是否只需要实现ExternalCatalog接口,就能基于SQL直接访问数据或进行DDL操作吗?答案是肯定的,但是,由于解析SQL过程中只能支持一种catalog,如果要实现Hive table joion Iceberg table该怎么办,如:

select * 
from iceberg_db.t1 
join hive_db.t2 
  on t1.k1 = t2.k1;

为了更为通用的解决这类问题,在Spark 3.0 preview版本中引入了multiple catalog功能,该功能对于catalog做了如下变化:

  • 增加了CatalogPlugin接口,所有新增数据源的自定义catalog都要实现该接口。该接口提供了统一的初始化方法,CatalogManager(下文会提到)在创建CatalogPlugin实例后会进行用户实现的初始化逻辑。在使用过程中,需要增加配置
spark.sql.catalog.<catalog-name>=<YourCatalogClass>

这里设置为需要的CatalogName,设置为具体的实现类,如,

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog

接口定义如下:

public interface CatalogPlugin {

  void initialize(String name, CaseInsensitiveStringMap options);

  String name();
}
  • 增加了TableCatalog的接口,该接口继承自CatalogPlugin,定义了相关的方法用来解析SQL中的元数据,如,tableExists,还定义了一系列方法进行DDL操作,如,createTable,alterTable,dropTable,接口定义如下,
public interface TableCatalog extends CatalogPlugin {

  Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;

  Table loadTable(Identifier ident) throws NoSuchTableException;

  default void invalidateTable(Identifier ident) {
  }

  default boolean tableExists(Identifier ident) {
    try {
      return loadTable(ident) != null;
    } catch (NoSuchTableException e) {
      return false;
    }
  }

  Table createTable(
      Identifier ident,
      StructType schema,
      Transform[] partitions,
      Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;

  Table alterTable(
      Identifier ident,
      TableChange... changes) throws NoSuchTableException;

  boolean dropTable(Identifier ident);

  void renameTable(Identifier oldIdent, Identifier newIdent)
      throws NoSuchTableException, TableAlreadyExistsException;
}
  • 增加了CatalogManager,该类的作用是在解析逻辑计划过程中,基于catalogName,返回匹配的CatalogPlugin实现。由于CatalogManager的增加,解析SQL时和catalog的交互也发生了变化:

在解析过程中,根据catalogName从CatalogManager获取具体的CatalogPlugin实现,V2SessionCatalog是为了兼容之前的catalog的实现机制,而CustomerCatalog是自定义的CatalogPlugin实现。同时,CatalogManager还会管理当前的Catalog/Namespace,相关方法如下:

def currentNamespace: Array[String]
def setCurrentNamespace(namespace: Array[String]): Unit
def currentCatalog: CatalogPlugin
def setCurrentCatalog(catalogName: String): Unit
  • 命名结构变更为.*,对于表名,原本只支持2层的命名结构,databaseName.tableName,但是在业界流行的数据库中(如MySQL,PostgreSQL),已经支持3层的命名结构,database.schema.table。而在multiple catalog实现过程中,引入了Namespace概念,使得SparkSQL能支持多层命名结构,如,catalog.ns1.ns2.table
    由于引入了catalog和namespace概念,SparkSQL还增加相关命令支持catalog/namespace的管理,如,
CREATE/DROP/SHOW NAMESPACES
 USE <catalogName>.<namespaceName>

除了multiple catalog以外,SparkSQL DatasourceV2还重构生成了SupportsRead/SupportsWrite等接口,用来支持数据源的各类操作,由于篇幅有限,就不在本文中具体展开。

基于 Spark 3.0 preview使用Iceberg + SparkSQL

在Spark DatasourceV2增加了multiple catalog等功能后,回到我们想要查询的SQL,实现步骤如下:

1.在Iceberg侧对CatalogPlugin/TableCatalog/SupportsRead等接口进行实现,实现类名如: org.apache.iceberg.spark.SparkCatalog

2.在spark的配置文件中设置:

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog

3.基于配置的catalogName,调整SQL如下,就可以进行基于SQL的跨数据源查询了。

select * 
from iceberg_catalog.ns1.t1
join hive_db.t2 on t1.k1 = t2.k1;

4.除了跨数据源数据分析以外,现在还可以对Iceberg的表进行DDL操作了,如,

create table iceberg_catalog.t1 ......
drop table iceberg_catalog.t1

总结

Spark 3.0 preview在DatasourceV2的功能方面较Spark2.4做了比较大的改动,Multiple Catalog作为比较重要的新增功能,使得新的数据源能很便捷的和SparkSQL进行整合,提供元数据相关服务。除了Multiple Catalog以外,还增加了诸如SupportsRead,SupportsWrite,SupportsPushDownFilters等一系列接口增强对数据源的整合。Iceberg作为新兴的表格式,能很好的利用DatasourceV2的新功能,结合SparkSQL构建数据湖解决方案。目前Iceberg开源代码还未针对新的DatasourceV2特性进行更新,在我们内部项目中已经对这块整合进行了相关实践,并计划贡献给社区,使得基于Iceberg的数据湖解决方案能更完善。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。image.png
Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
EMR数据湖开发治理之用户画像分析
通过本场景,你可以基于E-MapReduce + DLF + OSS-HDFS + DataWorks在云上快速体验完整的数据湖开发治理方案。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
7月前
使用parted创建大分区时 mkpart Warning: The resulting partition is not properly aligned for best performance.
fdisk不能创建大于2T的分区,创建大分区得用parted,我在用parted创建分区时遇到下面的警告提示
116 0
|
9月前
|
分布式计算 Java Spark
Optimizing Spark job parameters
Optimizing Spark job parameters
58 0
从零学习SpringCloud系列(二):Schema specific part is opaque
从零学习SpringCloud系列(二):Schema specific part is opaque
128 0
|
分布式计算 Spark
Spark - ReturnStatementInClosureException: Return statements aren‘t allowed in Spark closures
Spark 使用 RDD 调用 Filter 函数时,dirver 端卡住,报错 ReturnStatementInClosureException: Return statements aren't allowed in Spark closures,即闭包内无法使用 return 函数。
135 0
Spark - ReturnStatementInClosureException: Return statements aren‘t allowed in Spark closures
|
分布式计算 Spark
spark常用的Transformations 和Actions
spark常用的Transformations 和Actions
151 0
成功解决ImportError: Missing optional dependency ‘fastparquet‘. fastparquet is required for parquet supp
成功解决ImportError: Missing optional dependency ‘fastparquet‘. fastparquet is required for parquet supp
|
Oracle 关系型数据库
Mandatory Patching Requirement for Database Versions 11.2.0.3 or Earlier, Using DB Links (DOC ID 2335265.1)
Mandatory Patching Requirement for Database Versions 11.2.0.3 or Earlier, Using DB Links (DOC ID 2335265.
2713 0
相关产品
开源大数据平台 E-MapReduce
推荐文章
更多