一.数据湖三剑客介绍
关注大数据发展动态的朋友,都知道最近几年数据湖存储引擎发展很快,已经涌现出了数据湖三剑客 delta lake, iceberg 和hudi。这三者给数据湖带来了诸多新特性,如 acid事务支持,记录级别的增删改,流批统一的存储,可扩张的元数据管理,schema 约束和schema 演变,基于时间旅行提供多版本支持等等。
数据湖引擎带来的以上种种新特性,切切实实解决了大数据在企业落地过程中遇到的很多难以解决的痛点。也正是因此,很多企业都在积极调研尝试引进合适的数据湖框架。笔者所在公司自2020年开始也一直在进行这方面的调研和尝试。
二.我司数据湖引擎落地策略
关于数据湖三剑客的技术对比,网上有很多资料,笔者在这里不再详细赘述,不过有几点在此说明下:
- 在跟 spark执行引擎的兼容成熟度上,delta lake目前是做的最好的(毕竟是一个databricks娘胎里出来的);
- 在跟flink执行引擎的对接成熟度上,目前 iceberg和 hudi是做的更好的, delta 还没有投入很多精力做这块;(笔者的感觉是,flink在国内的热度比国外的热度要大,另外iceberg和hudi社区中华人贡献者的比重也相对 delta lake更多些,这可能跟国内厂商的推动和宣传有关);
- 在架构设计上,iceberg是公认更为优和雅先进的,经常被拿来跟 hive 对比;
由于我司目前大数据计算引擎大都是使用的spark (一些实时要求很高的场景,也已经在探索和落地使用 flink了),考虑到delta lake跟spark一样都是来自砖厂,其背后有着砖厂强大的背书支持和广泛的用户基础 (从dbr8.0后,建表时默认的存储格式就是delta lake了),跟spark在兼容性上也做得更好,同时底层存储采用的是开源的Parquet格式为日后对接更多执行引擎打好了基础,所以我们决定优先尝试落地使用 delta lake。待日后 iceberg 更为成熟后,可以再尝试引进 iceberg,两者可以并存并不冲突。(这三者都是轻量级的jar包形式,所以引入落地的技术成本都不大)。
delta lake对接spark外计算引擎
DBR8中默认的存储格式是delta
三.我司Delta lake数据湖引擎落地探索
跟大多数公司一样,为降低业务系统开发难度,我们的业务人员倾向于纯sql的开发方式。好消息是,Delta lake从 0.7.0版本之后,就支持spark 3.x系列的各种纯 sql操作了,包括ddl和 dml 。
所以通过使用 spark3.x,结合HiveExternalCatalog 和 delta lake,我们就能搭建一套现代化的数据湖仓架构,以纯sql方式开发业务代码了!Everyone loves sql, so why not ?!
delta 7.0 支持 ddl, dml
笔者先按照官方文档,使用delta-core_2.12-0.7.0.jar 配合spark-3.0.2-bin-hadoop2.7-hive1.2 ,通过 spark-shell 和spark-sql做了scala各种 api,和 sql各种 ddl, dml的验证测试,一切顺利。
注意:delta lake 和 spark各版本的兼容性,如官方下图所示:
delta 与 spark 各版本的兼容性
注意:delta lake 和spark各版本,跟scala的版本匹配关系如下:
- Spark 3.x 预编译都是使用的scala 2.12;
- spark 2.x 预编译使用的有scala 2.12 也有 scala 2.11;
- delta 在0.7.0之后只提供了 scala 2.12版;(因为对接的是spark 3.x)
- delta在0.7.0版本之前提供的有对接scala 2.11的也有对接scala 2.12的。
四.Delta lake 0.8.0踩坑记
在2021/02/05, delta lake 发布了0.8.0版本,如下图所示:
同时在,spark 在2021/03/02发布了3.1.1版本,如下图所示:(spark 3.1系列的第一个正式版本是3.1.1,而不是3.1.1,如下图所示):
看到 spark3.1和delta0.8各种feature的增强,笔者迫不及待地使用delta-core_2.12-0.8.0.jar 配合 spark-3.1.1-bin-hadoop2.7 ,通过 spark-shell 和spark-sql做了scala各种 api,和 sql各种 ddl, dml的验证测试,然后,然后坑来了!如下图所示,sql的update 和insert 都会报错:java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V。
(注意:spark官网发布的二进制安装包spark-3.1.1-bin-hadoop2.7 内置的是hive 2.3.7,而不是像spark-3.0.2-bin-hadoop2.7-hive1.2那样内置的hive1.2.1,这点需要注意。)
update报错截图
insert报错截图
笔者再次检查了delta 官网关于spark版本兼容性的说明,很明显,delta 0.8.0应该是支持spark3.1.1的:
官网关于兼容性的声明
所以接下来笔者经过了各种折腾,尝试了不同的ddl和dml语句 (使用hive 的 stored as 语法格式,使用spark 的using 语法格式,尝试建不同字段名不同字段类型的 delta 表,尝试不同的insert 和 update 语句等), 也尝试了不同的 scala update api, 折腾了一天多,都没有解决。
最后笔者通过谷歌,偶然搜索到了以下链接,打开链接查看,发现描述的问题跟我的现象是一样的:
该issue下还有详细描述,通过仔细查看发现,该问题的原因是 delta lake 0.8.0发布时,spark3.1.1还未正式对外发布,所以没有办法测试二者的兼容性;后续 delta 会在0.8.0的基础上,针对跟spark3.1.1的该兼容性问题,出一个patch release,我们只需要耐心静待一段时间就好了。
五.引进新框架新版本,如何避免踩坑
回头来看,以上链接其实就是delta 官方使用的记录community reported issues 的链接!兜兜转转一大圈,浪费了大量精力和时间,最后发现问题原因的竟然是官网!
如果我们在使用某个框架的新版本前,能提前多多关注社区动态,比如查看记录社区发现的已知问题的系统(大多使用的是jira,delta lake使用的是github issues),或加入社区日常讨论组(mail list, 或 slack channel),则自然能少走不少弯路,少踩很多坑。
六.兼容性测试过程中使用的命令附录
相关命令如下:
--spark-shell使用命令示例:
/opt/spark-3.0.2-bin-hadoop2.7-hive1.2/bin/spark-shell --verbose --jars /opt/delta-core_2.12-0.8.0.jar --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
--spark-sql使用命令示例:
/opt/spark-3.0.2-bin-hadoop2.7-hive1.2/bin/spark-sql --verbose --jars /opt/delta-core_2.12-0.8.0.jar --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --ddl: Create a delta table in the metastore CREATE OR REPLACE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA; -- ddl: Create a parquet table in the metastore CREATE TABLE events_parquet (date DATE, eventId STRING, eventType STRING, data STRING) USING parquet; -- dml: insert records into parquet table in the metastore insert into events_parquet values (current_date(),'eventid1','eventtype1','data1'); -- dml: insert records into delta table in the metastore insert into events values (current_date(),'eventid1','eventtype1','data1'); -- dml: update records in the parquet table in the metastore, should shout error UPDATE events_parquet SET eventType = 'eventtypeT' WHERE eventType = 'eventtype1' -- dml: update records in the delta table in the metastore, should be good UPDATE events SET eventType = 'eventtypeT' WHERE eventType = 'eventtype1' --dml: spark-shell, insert into delta table using select sql("insert into delta.`/delta/events` select * from events_parquet;").show --spark-shell, scala api import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, "/data/events/") deltaTable.updateExpr( // predicate and update expressions using SQL formatted string "eventType = 'clck'", Map("eventType" -> "'click'") import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.update( // predicate using Spark SQL functions and implicits col("eventType") === "clck", Map("eventType" -> lit("click")));
七.参考链接