Paimon与Spark的集成(一)

简介: Spark 已经成为了大数据领域软件栈中必不可少的组成部分。作为数据湖领域新起的 Paimon,与Spark的深度、全面的集成也将为 Paimon 在准实时场景、离线湖仓场景提供了便利。本文主要介绍一些在 Paimon 新版本中基于 Spark 计算引擎支持的主要功能。

Paimon

Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 ApacheFlink / Spark / Trino 等诸多业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。


Paimon x Spark

Apache Spark,作为大数据处理的统一计算分析引擎的,不仅支持多种语言的高级 API 使用,也支持了丰富的大数据场景应用,包括结构化数据处理的 Spark SQL、用于机器学习的 MLlib,用于图形处理的 GraphX,以及用于增量计算和流处理的Structured Streaming。Spark 已经成为了大数据领域软件栈中必不可少的组成部分。作为数据湖领域新起的 Paimon,与Spark的深度、全面的集成也将为 Paimon 在准实时场景、离线湖仓场景提供了便利。


接下来我们介绍一些在 Paimon 新版本中基于Spark计算引擎支持的主要功能。


Schema Evolution

Schema evolution 是一个数据湖领域一个非常关键的特性,它允许用户方便的修改表的当前 Schema 以适应现有数据,或随时间变化的新数据,同时保持数据的完整性和一致性。


在离线场景中,我们可以通过计算引擎,如 Spark 或者 Flink,提供的 Alter Table 的 SQL 语法来实现对 Schema 的操作。在某些场景下,我们并非都能实时准确的获取上游数据较当前表的 Schema 变化;另外在 Streaming 流式场景中以离线 Alter Table 的方式完成 Schema 的更新需要执行1)停止流作业,2)完成 Schema 更新操作,3)重启流作业这样的流程,这是较为低效的。


Paimon 支持了在数据写入的同时,自动完成 Source 数据和当前表数据的 Schema 合并,并将合并后的 Schema 作为表的最新 Schema,仅需要配置参数 write.merge-schema

data.write.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)


新增列

比较常见的是,在执行数据追加或覆盖操作时使用,以自动调整 Schema 以包含一个或多个新列。


假设原表的 Schema 为:

a INT
b STRING


新数据 data 的 Schema 为:

a INT
b STRING
c LONG
d Map<String, Double>


操作完成后的表的 Schema 变更为:

a INT
b STRING
c LONG
d Map<String, Double>


提升字段类型

Paimon 的 Schema Evolution 也同时支持数据类型的提升,如 Int 提升为 Long,Long 提升为 Decimal 等;以上述表继续写入数据,假设新数据的 Schema 为:

a Long
b STRING
c Decimal
d Map<String, Double>


操作完成后的表的 Schema 变更为:

a Long
b STRING
c Decimal
d Map<String, Double>


强制类型转换

如以上示例所示,Paimon 支持数据字段类型的提升,如数值型向更高的精度提升(由 Int 提升至 Long,由 Long 提升至 Decimal),同时 Paimon 也支持一些类型之间的强制转换,如 String 强转成 Date 类型或者 Long 转换成 Int,但需要显式的配置参数 write.merge-schema.explicit-cast

data.write.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.option("write.merge-schema.explicit-cast", "true")
.save(location)


假设原表的 Schema 为:

a LONG
b STRING //内容为2023-08-01的格式


新数据 data 的 Schema 为:

a INT
b DATE


操作完成后的表的 Schema 变更为:

a INT
b DATE


需要注意的是:

数据写入(追加或覆盖写)时的 Schema Evolution 不支持删除列和重命名列操作的,也不支持不在隐式/显式转换范围内的数据类型提升。当具体数值不能转换成目标类型时,为了避免将表数据破环,当前会报错,终止该操作。


Spark Structured Streaming

Spark Structured Streaming 是一个基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,可以像表达静态数据的批量计算一样的表达流计算。 Spark SQL 引擎将负责增量且持续地运行它,并随着流数据不断到达而更新最终结果。 Structured Streaming 支持流之间的聚合、事件时间窗口、流批之间 Join 等。Spark 通过 checkpointing 和 write-ahead logs 实现了端到端的 exactly-once。 简而言之,Structured Streaming 提供快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑流处理。


Paimon 在0.5和0.6两个版本逐步完善了 Spark Structured Streaming 的读写支持,提供了基于 Spark 引擎的流式读写能力。


Streaming Sink

Spark Structured Streaming 定义了三种输出模式(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts),Paimon 仅支持 Append 模式和 Complete 模式。

// `df` is the upstream source data.valstream=df  .writeStream  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")


Streaming Source

结合 Spark 支持的多种 Trigger策略和 Paimon 拓展的一些流式处理的能力,Paimon 可以支持丰富的 Streaming Source 的应用场景。


Paimon 提供了多样了 ScanMode,允许用户以合适的参数指定初始状态从 Paimon 表读取的数据。

ScanMode

描述

latest

仅读取后续持续写入的数据。

latest-full

读取当前快照的数据,以及后续持续写入的数据。

from-timestamp

读取参数scan.timestamp-millis指定的时间戳之后持续写入的数据。

from-snapshot

读取参数scan.snapshot-id指定的版本后续持续写入的数据。

from-snapshot-full

读取参数scan.snapshot-id指定的版本快照数据,以及后续持续写入的数据。

default

默认等同于latest-full模式;如果指定scan.snapshot-id,等同于from-snapshot模式;如果指定scan.timestamp-millis,等同于from-timestamp模式;


Paimon 通过拓展 SupportsAdmissionControl接口,实现了 Source 端的流量控制,避免了由于要处理的单个 batch 的数据量过大而引起的流式作业运行失败的问题。Paimon 目前支持以下 ReadLimit的实现。

Readlimit参数

描述

read.stream.maxFilesPerTrigger

一个Batch最多返回的Splits数

read.stream.maxBytesPerTrigger

一个Batch最多返回的byte数

read.stream.maxRowsPerTrigger

一个Batch最多返回的行数

read.stream.minRowsPerTrigger

一个Batch最少返回的行数,和maxTriggerDelayMs搭配使用构成ReadMinRows

read.stream.maxTriggerDelayMs

一个Batch触发的最大延时,和minRowsPerTrigger搭配使用构成ReadMinRows


以两个示例说明 Paimon Spark Structured Streaming 的用法。


示例一:

普通的流式增量 ETL 场景。

// Paimon source表的Schema为:time Long, stockId INT, avg_price DOUBLEvalquery=spark.readStream  .format("paimon")
  .option("scan.mode", "latest")
  .load("/path/to/paimon/source/table")
  .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", "price")
  .withWatermark("timestamp", "10 seconds")
  .groupBy(window($"timestamp", "5 seconds"), col("stockId"))
  .writeStream  .format("console")
  .trigger(Trigger.ProcessingTime(180, TimeUnit.SECONDS))
  .start()

该示例以3分钟的间隔流式读取 Paimon 后续的增量数据,进行 ETL 转化后同步到下游。


示例二:

适用于追补数据的场景,流式读取 Paimon 表自某个指定快照之后的数据,读取完成后不再读取后续写入的数据,同时限定了每个 batch 大致的数据规模。

valquery=spark.readStream  .format("paimon")
  .option("scan.mode", "from-snapshot")
  .option("scan.snapshot-id", 345)
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()


示例代码中指定 Trigger.AvailableNow() 触发器,表示仅读取流式任务启动时当前 Paimon 可用的数据;使用 from-snapshot 的 ScanMode 标识了读取快照ID=345之后写入的数据。在配置 maxBytesPerTrigger 等于128MB后,Spark Structured Streaming 会将待消费的数据按照128MB的 Splits 大小进行 Batch 切分,由多个 Batch 完成当前快照数据的消费。


Spark SQL 拓展

Insert Overwrite

Insert Overwrite 是一个常用的 SQL 语法,用于重写整张表或者表中指定分区。该功能在 Paimon 新版本中也得到支持,包括了 static 和 dynamic 两种模式。


Static Overwrite

覆盖整张表:无论当前表是否是分区表,通过以下 SQL 可以完成使用新数据覆盖原表数据的操作。


在 Spark 环境下使用 Paimon,请参考这里

USE paimon;CREATETABLE T (a INT, b STRING) TBLPROPERTIES('primary-key'='a');INSERT OVERWRITE T VALUES(1,"a"),(2,"b");----------1 a
2 b
----------INSERT OVERWRITE T VALUES(1,"a2"),(3,"c");----------1 a2
3 c
----------


覆盖指定的表分区。

USE paimon;CREATETABLE T (dt STRING, a INT, b STRING)TBLPROPERTIES('primary-key'='dt,a')PARTITIONED BY(dt);INSERT OVERWRITE T VALUES("2023-10-01",1,"a"),("2023-10-02",2,"b");----------------2023-10-011 a
2023-10-022 b
----------------INSERT OVERWRITE T PARTITION (dt ="2023-10-02")VALUES(2,"b2"),(4,"d");----------------2023-10-011 a
2023-10-022 b2
2023-10-02 d 4----------------


Dynamic Parititon Overwrite(DPO)

默认情况下是在 Static 模式下执行 Insert Overwrite 的,用户需要显式的指定要覆盖的分区信息;我们可以通过参数启用 Dynamic 模式来执行 Insert Overwrite,这样 Paimon 将自动判断 source 端数据所涉及到的分区来执行覆盖操作。


Paimon 启动 DPO 需要启动 spark session 时额外指定 paimon 的 extension:

--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions


USE paimon;CREATETABLE T (dt STRING, a INT, b STRING)TBLPROPERTIES('primary-key'='dt,a')PARTITIONED BY(dt);INSERT OVERWRITE T VALUES("2023-10-01",1,"a"),("2023-10-02",2,"b");----------------2023-10-011 a
2023-10-022 b
----------------SET spark.sql.sources.partitionOverwriteMode=DYNAMIC;INSERT OVERWRITE T VALUES("2023-10-02",2,"b2"),("2023-10-02",4,"d");----------------2023-10-011 a
2023-10-022 b2
2023-10-02 d 4----------------


在配置 spark.sql.sources.partitionOverwriteMode=DYNAMIC 后,不再需要指定要覆盖dt="2023-10-02"的分区,实现了数据的动态覆盖。


Call procedure

除了由 Spark 框架提供了常用的 SQL 语法(包括 DDL,DML,Query 以及一些表信息查询)外,Paimon 还需要拓展一些额外的 SQL 语法来提供自定义功能的操作接口,便于用户对 Paimon 表的管理和探索。Call Procedure 的引入为这种场景的支持提供了框架层面的支持。


procedure 的语法:

CALL procedure_name(table => 'table_identifier', arg1 => '', ...);


目前 Paimon 已经实现了三种 procedure:

Procedure

描述

用法

create_tag

为指定快照创建标签

CALL create_tag(table => 'T', tag => 'test_tag', snapshot => 2)

delete_tag

删除已创建的标签

CALL delete_tag(table => 'T', tag => 'test_tag')

rollback

回滚表到指定标签或者版本

CALL rollback(table => 'T', version => '2')


场景示例

以下构造一个流式开启 Schema Evolution 的示例,上游数据实时同步到 paimon 的 user 表(原表仅有 userId 和 name 两个维度),在某时刻上游数据添加了 age 属性,在无需停止作业运维时通过开启 Schema Evolution 自动完成元数据的合并和新数据的写入。

1698745701468-a30a6b39-fa80-46e9-9b20-b6c1fd5d623f.png


// 原表的定义// CREATE TABLE T (userId INT, name STRING) TBLPROPERTIES ('primary-key'='userId');// -- 假设原表的流式写入的数据--// 1 user1// 2 user2// -------------------------// 使用MemoryStream模拟上游streaming数据valinputData=MemoryStream[(Int, String, Int)]
valstream=inputData  .toDS()
  .toDF("userId", "name", "age")
  .writeStream  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .format("paimon")
  .start("/path/to/user_table")
inputData.addData((1, "user1", 30), (3, "user3", 33))
stream.processAllAvailable()
// -- 该batch数据写入后的表数据--// 1 user1 30// 2 user2 null// 3 user3 33// ---------------------------


后续规划

Paimon 孵化于 Flink 社区,源于流式数仓,但其远不止于此。Paimon 将在与如 Apache Spark 这样的其他引擎的深度集成上,以及在如离线湖仓的场景支持上持续发力。在接下来的时间上,社区在和 Spark 引擎的支持上将逐渐拓展支持更多的 Spark SQL 语法,比如 Update、Merge Into 等;在读写性能上也会进行深层次优化。

目录
相关文章
|
20天前
|
SQL 存储 缓存
Paimon与Spark
Paimon与Spark
26 1
|
4月前
|
分布式计算 API Apache
Spark与Elasticsearch的集成与全文搜索
Spark与Elasticsearch的集成与全文搜索
|
2月前
|
SQL 分布式计算 大数据
Paimon 与 Spark 的集成(二):查询优化
通过一系列优化,我们将 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平,本文对其中的关键优化点进行了详细介绍。
117513 30
|
4月前
|
SQL 关系型数据库 MySQL
Apache Flink 和 Paimon 在自如数据集成场景中的使用
Apache Flink 和 Paimon 在自如数据集成场景中的使用
314 0
|
4月前
|
存储 缓存 分布式计算
Spark与云存储的集成:S3、Azure Blob Storage
Spark与云存储的集成:S3、Azure Blob Storage
|
4月前
|
消息中间件 分布式计算 Kafka
Spark与Kafka的集成与流数据处理
Spark与Kafka的集成与流数据处理
|
22天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
28天前
|
NoSQL Java Redis
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
354 0
|
2月前
|
NoSQL Java Redis
SpringBoot集成Redis
SpringBoot集成Redis
464 0
|
2月前
|
NoSQL Java Redis
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
293 1