Paimon与Spark的集成(一)

简介: Spark 已经成为了大数据领域软件栈中必不可少的组成部分。作为数据湖领域新起的 Paimon,与Spark的深度、全面的集成也将为 Paimon 在准实时场景、离线湖仓场景提供了便利。本文主要介绍一些在 Paimon 新版本中基于 Spark 计算引擎支持的主要功能。

Paimon

Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 ApacheFlink / Spark / Trino 等诸多业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。


Paimon x Spark

Apache Spark,作为大数据处理的统一计算分析引擎的,不仅支持多种语言的高级 API 使用,也支持了丰富的大数据场景应用,包括结构化数据处理的 Spark SQL、用于机器学习的 MLlib,用于图形处理的 GraphX,以及用于增量计算和流处理的Structured Streaming。Spark 已经成为了大数据领域软件栈中必不可少的组成部分。作为数据湖领域新起的 Paimon,与Spark的深度、全面的集成也将为 Paimon 在准实时场景、离线湖仓场景提供了便利。


接下来我们介绍一些在 Paimon 新版本中基于Spark计算引擎支持的主要功能。


Schema Evolution

Schema evolution 是一个数据湖领域一个非常关键的特性,它允许用户方便的修改表的当前 Schema 以适应现有数据,或随时间变化的新数据,同时保持数据的完整性和一致性。


在离线场景中,我们可以通过计算引擎,如 Spark 或者 Flink,提供的 Alter Table 的 SQL 语法来实现对 Schema 的操作。在某些场景下,我们并非都能实时准确的获取上游数据较当前表的 Schema 变化;另外在 Streaming 流式场景中以离线 Alter Table 的方式完成 Schema 的更新需要执行1)停止流作业,2)完成 Schema 更新操作,3)重启流作业这样的流程,这是较为低效的。


Paimon 支持了在数据写入的同时,自动完成 Source 数据和当前表数据的 Schema 合并,并将合并后的 Schema 作为表的最新 Schema,仅需要配置参数 write.merge-schema

data.write.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)


新增列

比较常见的是,在执行数据追加或覆盖操作时使用,以自动调整 Schema 以包含一个或多个新列。


假设原表的 Schema 为:

a INT
b STRING


新数据 data 的 Schema 为:

a INT
b STRING
c LONG
d Map<String, Double>


操作完成后的表的 Schema 变更为:

a INT
b STRING
c LONG
d Map<String, Double>


提升字段类型

Paimon 的 Schema Evolution 也同时支持数据类型的提升,如 Int 提升为 Long,Long 提升为 Decimal 等;以上述表继续写入数据,假设新数据的 Schema 为:

a Long
b STRING
c Decimal
d Map<String, Double>


操作完成后的表的 Schema 变更为:

a Long
b STRING
c Decimal
d Map<String, Double>


强制类型转换

如以上示例所示,Paimon 支持数据字段类型的提升,如数值型向更高的精度提升(由 Int 提升至 Long,由 Long 提升至 Decimal),同时 Paimon 也支持一些类型之间的强制转换,如 String 强转成 Date 类型或者 Long 转换成 Int,但需要显式的配置参数 write.merge-schema.explicit-cast

data.write.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.option("write.merge-schema.explicit-cast", "true")
.save(location)


假设原表的 Schema 为:

a LONG
b STRING //内容为2023-08-01的格式


新数据 data 的 Schema 为:

a INT
b DATE


操作完成后的表的 Schema 变更为:

a INT
b DATE


需要注意的是:

数据写入(追加或覆盖写)时的 Schema Evolution 不支持删除列和重命名列操作的,也不支持不在隐式/显式转换范围内的数据类型提升。当具体数值不能转换成目标类型时,为了避免将表数据破环,当前会报错,终止该操作。


Spark Structured Streaming

Spark Structured Streaming 是一个基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,可以像表达静态数据的批量计算一样的表达流计算。 Spark SQL 引擎将负责增量且持续地运行它,并随着流数据不断到达而更新最终结果。 Structured Streaming 支持流之间的聚合、事件时间窗口、流批之间 Join 等。Spark 通过 checkpointing 和 write-ahead logs 实现了端到端的 exactly-once。 简而言之,Structured Streaming 提供快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑流处理。


Paimon 在0.5和0.6两个版本逐步完善了 Spark Structured Streaming 的读写支持,提供了基于 Spark 引擎的流式读写能力。


Streaming Sink

Spark Structured Streaming 定义了三种输出模式(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts),Paimon 仅支持 Append 模式和 Complete 模式。

// `df` is the upstream source data.valstream=df  .writeStream  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")


Streaming Source

结合 Spark 支持的多种 Trigger策略和 Paimon 拓展的一些流式处理的能力,Paimon 可以支持丰富的 Streaming Source 的应用场景。


Paimon 提供了多样了 ScanMode,允许用户以合适的参数指定初始状态从 Paimon 表读取的数据。

ScanMode

描述

latest

仅读取后续持续写入的数据。

latest-full

读取当前快照的数据,以及后续持续写入的数据。

from-timestamp

读取参数scan.timestamp-millis指定的时间戳之后持续写入的数据。

from-snapshot

读取参数scan.snapshot-id指定的版本后续持续写入的数据。

from-snapshot-full

读取参数scan.snapshot-id指定的版本快照数据,以及后续持续写入的数据。

default

默认等同于latest-full模式;如果指定scan.snapshot-id,等同于from-snapshot模式;如果指定scan.timestamp-millis,等同于from-timestamp模式;


Paimon 通过拓展 SupportsAdmissionControl接口,实现了 Source 端的流量控制,避免了由于要处理的单个 batch 的数据量过大而引起的流式作业运行失败的问题。Paimon 目前支持以下 ReadLimit的实现。

Readlimit参数

描述

read.stream.maxFilesPerTrigger

一个Batch最多返回的Splits数

read.stream.maxBytesPerTrigger

一个Batch最多返回的byte数

read.stream.maxRowsPerTrigger

一个Batch最多返回的行数

read.stream.minRowsPerTrigger

一个Batch最少返回的行数,和maxTriggerDelayMs搭配使用构成ReadMinRows

read.stream.maxTriggerDelayMs

一个Batch触发的最大延时,和minRowsPerTrigger搭配使用构成ReadMinRows


以两个示例说明 Paimon Spark Structured Streaming 的用法。


示例一:

普通的流式增量 ETL 场景。

// Paimon source表的Schema为:time Long, stockId INT, avg_price DOUBLEvalquery=spark.readStream  .format("paimon")
  .option("scan.mode", "latest")
  .load("/path/to/paimon/source/table")
  .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", "price")
  .withWatermark("timestamp", "10 seconds")
  .groupBy(window($"timestamp", "5 seconds"), col("stockId"))
  .writeStream  .format("console")
  .trigger(Trigger.ProcessingTime(180, TimeUnit.SECONDS))
  .start()

该示例以3分钟的间隔流式读取 Paimon 后续的增量数据,进行 ETL 转化后同步到下游。


示例二:

适用于追补数据的场景,流式读取 Paimon 表自某个指定快照之后的数据,读取完成后不再读取后续写入的数据,同时限定了每个 batch 大致的数据规模。

valquery=spark.readStream  .format("paimon")
  .option("scan.mode", "from-snapshot")
  .option("scan.snapshot-id", 345)
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()


示例代码中指定 Trigger.AvailableNow() 触发器,表示仅读取流式任务启动时当前 Paimon 可用的数据;使用 from-snapshot 的 ScanMode 标识了读取快照ID=345之后写入的数据。在配置 maxBytesPerTrigger 等于128MB后,Spark Structured Streaming 会将待消费的数据按照128MB的 Splits 大小进行 Batch 切分,由多个 Batch 完成当前快照数据的消费。


Spark SQL 拓展

Insert Overwrite

Insert Overwrite 是一个常用的 SQL 语法,用于重写整张表或者表中指定分区。该功能在 Paimon 新版本中也得到支持,包括了 static 和 dynamic 两种模式。


Static Overwrite

覆盖整张表:无论当前表是否是分区表,通过以下 SQL 可以完成使用新数据覆盖原表数据的操作。


在 Spark 环境下使用 Paimon,请参考这里

USE paimon;CREATETABLE T (a INT, b STRING) TBLPROPERTIES('primary-key'='a');INSERT OVERWRITE T VALUES(1,"a"),(2,"b");----------1 a
2 b
----------INSERT OVERWRITE T VALUES(1,"a2"),(3,"c");----------1 a2
3 c
----------


覆盖指定的表分区。

USE paimon;CREATETABLE T (dt STRING, a INT, b STRING)TBLPROPERTIES('primary-key'='dt,a')PARTITIONED BY(dt);INSERT OVERWRITE T VALUES("2023-10-01",1,"a"),("2023-10-02",2,"b");----------------2023-10-011 a
2023-10-022 b
----------------INSERT OVERWRITE T PARTITION (dt ="2023-10-02")VALUES(2,"b2"),(4,"d");----------------2023-10-011 a
2023-10-022 b2
2023-10-02 d 4----------------


Dynamic Parititon Overwrite(DPO)

默认情况下是在 Static 模式下执行 Insert Overwrite 的,用户需要显式的指定要覆盖的分区信息;我们可以通过参数启用 Dynamic 模式来执行 Insert Overwrite,这样 Paimon 将自动判断 source 端数据所涉及到的分区来执行覆盖操作。


Paimon 启动 DPO 需要启动 spark session 时额外指定 paimon 的 extension:

--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions


USE paimon;CREATETABLE T (dt STRING, a INT, b STRING)TBLPROPERTIES('primary-key'='dt,a')PARTITIONED BY(dt);INSERT OVERWRITE T VALUES("2023-10-01",1,"a"),("2023-10-02",2,"b");----------------2023-10-011 a
2023-10-022 b
----------------SET spark.sql.sources.partitionOverwriteMode=DYNAMIC;INSERT OVERWRITE T VALUES("2023-10-02",2,"b2"),("2023-10-02",4,"d");----------------2023-10-011 a
2023-10-022 b2
2023-10-02 d 4----------------


在配置 spark.sql.sources.partitionOverwriteMode=DYNAMIC 后,不再需要指定要覆盖dt="2023-10-02"的分区,实现了数据的动态覆盖。


Call procedure

除了由 Spark 框架提供了常用的 SQL 语法(包括 DDL,DML,Query 以及一些表信息查询)外,Paimon 还需要拓展一些额外的 SQL 语法来提供自定义功能的操作接口,便于用户对 Paimon 表的管理和探索。Call Procedure 的引入为这种场景的支持提供了框架层面的支持。


procedure 的语法:

CALL procedure_name(table => 'table_identifier', arg1 => '', ...);


目前 Paimon 已经实现了三种 procedure:

Procedure

描述

用法

create_tag

为指定快照创建标签

CALL create_tag(table => 'T', tag => 'test_tag', snapshot => 2)

delete_tag

删除已创建的标签

CALL delete_tag(table => 'T', tag => 'test_tag')

rollback

回滚表到指定标签或者版本

CALL rollback(table => 'T', version => '2')


场景示例

以下构造一个流式开启 Schema Evolution 的示例,上游数据实时同步到 paimon 的 user 表(原表仅有 userId 和 name 两个维度),在某时刻上游数据添加了 age 属性,在无需停止作业运维时通过开启 Schema Evolution 自动完成元数据的合并和新数据的写入。

1698745701468-a30a6b39-fa80-46e9-9b20-b6c1fd5d623f.png


// 原表的定义// CREATE TABLE T (userId INT, name STRING) TBLPROPERTIES ('primary-key'='userId');// -- 假设原表的流式写入的数据--// 1 user1// 2 user2// -------------------------// 使用MemoryStream模拟上游streaming数据valinputData=MemoryStream[(Int, String, Int)]
valstream=inputData  .toDS()
  .toDF("userId", "name", "age")
  .writeStream  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .format("paimon")
  .start("/path/to/user_table")
inputData.addData((1, "user1", 30), (3, "user3", 33))
stream.processAllAvailable()
// -- 该batch数据写入后的表数据--// 1 user1 30// 2 user2 null// 3 user3 33// ---------------------------


后续规划

Paimon 孵化于 Flink 社区,源于流式数仓,但其远不止于此。Paimon 将在与如 Apache Spark 这样的其他引擎的深度集成上,以及在如离线湖仓的场景支持上持续发力。在接下来的时间上,社区在和 Spark 引擎的支持上将逐渐拓展支持更多的 Spark SQL 语法,比如 Update、Merge Into 等;在读写性能上也会进行深层次优化。

目录
相关文章
|
7月前
|
SQL 存储 缓存
Paimon与Spark
Paimon与Spark
279 1
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
141 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
分布式计算 大数据 Apache
Apache Spark & Paimon Meetup · 北京站,助力 LakeHouse 架构生产落地
2024年11月15日13:30北京市朝阳区阿里中心-望京A座-05F,阿里云 EMR 技术团队联合 Apache Paimon 社区举办 Apache Spark & Paimon meetup,助力企业 LakeHouse 架构生产落地”线下 meetup,欢迎报名参加!
106 3
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
72 1
|
6月前
|
分布式计算 Serverless 调度
EMR Serverless Spark:结合实时计算 Flink 基于 Paimon 实现流批一体
本文演示了使用实时计算 Flink 版和 Serverless Spark 产品快速构建 Paimon 数据湖分析的流程,包括数据入湖 OSS、交互式查询,以及离线Compact。Serverless Spark完全兼容Paimon,通过内置的DLF的元数据实现了和其余云产品如实时计算Flink版的元数据互通,形成了完整的流批一体的解决方案。同时支持灵活的作业运行方式和参数配置,能够满足实时分析、生产调度等多项需求。
60824 107
|
7月前
|
SQL 分布式计算 大数据
Paimon 与 Spark 的集成(二):查询优化
通过一系列优化,我们将 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平,本文对其中的关键优化点进行了详细介绍。
118360 30
|
7月前
|
SQL 分布式计算 大数据
Spark 的集成
Spark 的集成
78 2
|
2月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
28天前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
52 0
|
5月前
|
监控 druid Java
spring boot 集成配置阿里 Druid监控配置
spring boot 集成配置阿里 Druid监控配置
320 6

热门文章

最新文章