1. 引入
在类Hadoop系统上支持ACID有了更大的吸引力,其中Databricks的Delta Lake和Uber开源的Hudi也成为了主要贡献者和竞争对手。两者都通过在“parquet”文件格式中提供不同的抽象以解决主要问题;很难选择一个比另一个更好。此博客将使用一个非常基本的示例来了解这些工具的工作原理,并让读者来比较两者的优缺点。
我们将使用与本系列下一篇文章中相反的方法,后面我们将讨论Hadoop上Data Lake的重要性,以及为什么会出现对诸如Delta/Hudi之类的系统的需求,以及数据工程师在过去如何为Lakes孤立地构建易错的ACID系统。
2. 初始化
2.1 环境
源数据库:AWS RDS MySQL
CDC工具:AWS DMS
Hudi:AWS EMR 5.29.0
Delta:Databricks运行时6.1
对象/文件存储:AWS S3
上面的工具集主要用于演示;也可以使用以下工具替代
源数据库:任何传统/基于云的RDBMS
CDC工具:Attunity,Oracle Golden Gate,Debezium,Fivetran,自定义Binlog解析器
Hudi:开源/企业Hadoop上的Apache Hudi
Delta:开源/企业Hadoop上的Delta Lake
对象/文件存储:ADLS / HDFS
2.2 数据准备步骤
create database demo; use demo; create table hudi_delta_test ( pk_id integer, name varchar(255), value integer, updated_at timestamp default now() on update now(), created_at timestamp default now(), constraint pk primary key(pk_id) ); insert into hudi_delta_test(pk_id,name,value) values(1,’apple’,10); insert into hudi_delta_test(pk_id,name,value) values(2,’samsung’,20); insert into hudi_delta_test(pk_id,name,value) values(3,’dell’,30); insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
现在使用DMS将数据加载到S3中的某个位置,并使用文件夹名称full_load来标识该位置。为了更贴合标题,我们将跳过DMS的设置和配置。加载到S3后如下图所示。
接着在MySQL表中执行一些插入/更新/删除操作
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40); update hudi_delta_test set value = 201 where pk_id=2; delete from hudi_delta_test where pk_id=3;
继续略过DMS阶段,将CDC数据按以下方式加载到S3,如下图所示
注意:DMS将填充一个名为“ Op”的附加字段,表示“操作”,Op取值I/U/D,分别对应插入、更新和删除。以下图所示显示了CDC数据的内容。
df = spark.read.parquet('s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test') df.show()
完成了数据准备后正式开始比对。DMS将持续将CDC事件传送到S3(供Hudi和Delta Lake使用),此S3为数据源。两种工具的最终状态都旨在获得一致的统一视图,如上图MySQL所示。
3. 使用Apache HUDI
Hudi有两种方式处理UPSERTS [1]
- 写时复制(CoW):数据以列格式(Parquet)存储,并且在更新时会创建文件的新版本。此存储类型最适合于读繁重的工作负载,因为数据集的最新版本始终在有效的列格式文件中可用。
- 读时合并(MoR):数据以列(Parquet)和基于行(Avro)的格式存储;更新记录到基于行的“增量文件”中,并在以后进行压缩,以创建列文件的新版本。此存储类型最适合于写繁重的工作负载,因为新提交会以增量文件的形式快速写入,但是读取数据集需要合并列文件与增量文件。
3.1 启动Spark Shell
使用以下命令打开Spark Shell并进行相关导入.
spark-shell — conf “spark.serializer=org.apache.spark.serializer.KryoSerializer” — conf “spark.sql.hive.convertMetastoreParquet=false” — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor
3.2 使用CoW
val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test” val hudiTableName = “hudi_cow” val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_cow” val hudiOptions = Map[String,String] ( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”, DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “COPY_ON_WRITE”, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”, DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) val temp = spark.read.format(“parquet”).load(inputDataPath) val fullDF = temp.withColumn(“Op”,lit(‘I’)) fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
由于在Hudi选项中使用了Hive自动同步配置,因此会在Hive中创建一个名为“ hudi_cow”的表。该表使用具有Hoodie格式的Parquet SerDe创建,表结构如下图所示。
表数据如下图所示
val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”) updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
进行更新操作,表“hudi_cow”将有最新的更新数据,如下图所示
如CoW定义中所述,当我们以hudi格式将updateDF写入同一S3位置时,更新的数据在写时被复制,并且快照和增量数据使用同一张表。
3.3 使用MoR
val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test” val hudiTableName = “hudi_mor” val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_mor” val hudiOptions = Map[String,String] ( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”, DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “MERGE_ON_READ”, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”, DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) val temp = spark.read.format(“parquet”).load(inputDataPath) val fullDF = temp.withColumn(“Op”,lit(‘I’)) fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
还是开启了Hive自动同步,将在Hive中创建两张名为“hudi_mor”和“ hudi_mor_rt”的表。hudi_mor是经过读优化的表,具有快照数据,而hudi_mor_rt将具有增量和实时合并数据。数据将会以频繁的压缩间隔被压缩,并提供给hudi_mor。hudi_mor_rt利用Avro格式存储增量数据。正如MoR定义所示,通过hudi_mor_rt读取数据时将即时合并。这对于高更新源表很有用,同时还提供一致且非最新的读优化表。
注意:“ hudi_mor”和“ hudi_mor_rt”都指向相同的S3存储桶,只是定义了不同的存储格式。
可以看到加载后两表内容相同,内容如下所示
val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”) updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
表hudi_mor在很短时间内就具有相同的内容(因为演示中的数据很小,并且很快会被压缩),只要merge成功,表hudi_mor_rt就会有最新数据。
现在看看这些Hudi格式表的S3日志的变化。底层存储格式为parquet,同时通过日志方式管理ACID。通常生成以下类型的文件:
- hoodie_partition_metadata:这是一个小文件,包含有关给定分区中partitionDepth和最后一次commitTime的信息
- hoodie.properties:存储表名称、存储类型信息
- commit和clean:文件统计信息和有关正在写入的新文件的信息,以及诸如numWrites,numDeletes,numUpdateWrites,numInserts和一些其他相关审计字段之类的信息,存储在这些文件中。这些文件在每次提交后生成
以上3个文件对于CoW和MoR类型的表都是通用的。另外对于MoR表,额外有为UPSERTED分区创建的avro格式的日志文件。如下所示的第一个log文件是CoW表中不存在的日志文件。
4. 使用Delta Lake
使用下面的代码片段,我们以parquet格式读取完整的数据,并以delta格式将其写入不同的位置
from pyspark.sql.functions import * inputDataPath = "s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test" deltaTablePath = "s3://development-dl/demo/hudi-delta-demo/delta_table" fullDF = spark.read.format("parquet").load(inputDataPath) fullDF = fullDF.withColumn("Op",lit('I')) fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)
在Databricks Notebook的SQL界面中使用以下命令可以创建一个Hive外表,“using delta”关键字会包含基础SERDE和FILE格式的定义。
%sql create table delta_table using delta location 's3://development-dl/demo/hudi-delta-demo/delta_table'
该表的DDL如下所示。
%sql show create table delta_table
表会包含与完整加载文件相同的所有记录。
%sql select * from delta_table
使用以下命令读取CDC数据并在Hive中注册为临时视图
updateDF = spark.read.parquet("s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test") updateDF.createOrReplaceTempView("temp")
MERGE命令:下面是执行UPSERT的MERGE SQL,它作为SQL很方便地被执行,也可以在spark.sql()方法调用中执行
%sql MERGE INTO delta_table target USING (SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at FROM temp latest_changes INNER JOIN ( SELECT pk_id, max(updated_at) AS MaxDate FROM temp GROUP BY pk_id ) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source ON source.pk_id == target.pk_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
MERGE之后,Hive中delta_table的内容也更新了。
%sql select * from delta_table
与Hudi一样,Delta Lake基本文件存储格式也是“parquet”。Delta提供带有日志和版本控制的ACID功能。接着看看S3在装载和CDC合并后的变化。
增量日志包含JSON格式的日志,文件中包含每次提交后的schema和最新文件的信息。
在CDC合并的情况下,由于可以插入/更新或删除多条记录。初始parquet文件的内容分为多个较小的parquet文件,这些较小的文件会被重写。如果对表进行了分区,则仅与更新的分区相对应的CDC数据将受到影响。初始parquet文件仍存在于该文件夹中,但已从新的日志文件中删除。如果我们在此表上运行VACUUM,则可以物理删除该文件。也可以使用OPTIMIZE命令[6]来串联这些较小的文件。
Delta日志附加了另一个JSON格式的日志文件,该文件存储schema和指向最新文件的文件指针。
5. 总结
上述两个示例中都按原样保留了删除的记录,并通过Op ='D'标识删除,这是故意而为以显示DMS的功能,下面的参考资料显示了如何将这种软删除转换为硬删除。
希望这是一个有用的比较,有助于做出合理的选择,选择合适的数据湖框架。