使用Apache Hudi + Amazon EMR进行变化数据捕获(CDC)

本文涉及的产品
数据管理 DMS,安全协同 3个实例 3个月
推荐场景:
学生管理系统数据库
简介: 使用Apache Hudi + Amazon EMR进行变化数据捕获(CDC)

前一篇文章中我们讨论了如何使用Amazon数据库迁移服务(DMS)无缝地收集CDC数据。

https://towardsdatascience.com/data-lake-change-data-capture-cdc-using-amazon-database-migration-service-part-1-capture-b43c3422aad4

下面将演示如何处理CDC数据,以便在数据湖中近实时表示数据库的变更,我们将使用Apache Hudi和Amazon EMR来完成此操作。Apachehudi是一个开源的数据管理框架,用于简化近实时的增量数据处理。

创建一个新的EMR集群

$ aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Spark Name=Hive --ebs-root-volume-size 10 --ec2-attributes '{"KeyName":"roopikadf","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-097e5d6e","EmrManagedSlaveSecurityGroup":"sg-088d03d676ac73013","EmrManagedMasterSecurityGroup":"sg-062368f478fb07c11"}' --service-role EMR_DefaultRole --release-label emr-6.0.0 --name 'Training' --instance-groups '[{"InstanceCount":3,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m5.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"Master - 1"}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1 --bootstrap-actions Path=s3://aws-analytics-course/job/energy/emr.sh,Name=InstallPythonLibs

在创建EMR集群之后,使用SSH登录到主节点,并运行以下命令,这些命令将把Apache Hudi Jar文件复制到S3。

$ aws s3 cp /usr/lib/hudi/hudi-spark-bundle.jar s3://aws-analytics-course/hudi/jar/   upload: ../../usr/lib/hudi/hudi-spark-bundle.jar to s3://aws-analytics-course/hudi/jar/hudi-spark-bundle.jar
$ aws s3 cp /usr/lib/spark/external/lib/spark-avro.jar s3://aws-analytics-course/hudi/jar/
upload: ../../usr/lib/spark/external/lib/spark-avro.jar to s3://aws-analytics-course/hudi/jar/spark-avro.jar
$ aws s3 ls s3://aws-analytics-course/hudi/jar/
2020-10-21 17:00:41   23214176 hudi-spark-bundle.jar
2020-10-21 17:00:56     101212 spark-avro.jar

接着创建一个新的EMR Notebook,并在以下地址获取上传的NoteBook,上传hudi/hudi.ipynb

$ git clone https://github.com/mkukreja1/blogs.git

使用在上一步上传到S3的Hudi Jar文件创建一个Spark Session

from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, DecimalType
from pyspark.sql.functions import *
from pyspark.sql.functions import concat, lit, col
spark = pyspark.sql.SparkSession.builder.appName("Product_Price_Tracking") \
     .config("spark.jars", "s3://aws-analytics-course/hudi/jar/hudi-spark-bundle.jar,s3://aws-analytics-course/hudi/jar/spark-avro.jar") \
     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
     .config("spark.sql.hive.convertMetastoreParquet", "false") \
     .getOrCreate()

读取CDC文件,我们将从读取全量文件开始

TABLE_NAME = "coal_prod"
S3_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/LOAD00000001.csv"
S3_HUDI_DATA = "s3://aws-analytics-course/hudi/data/coal_prod"
coal_prod_schema = StructType([StructField("Mode", StringType()),
                               StructField("Entity", StringType()),
                               StructField("Code", StringType()),
                               StructField("Year", IntegerType()),
                               StructField("Production", DecimalType(10,2)),
                               StructField("Consumption", DecimalType(10,2))
                               ])
df_coal_prod = spark.read.csv(S3_RAW_DATA, header=False, schema=coal_prod_schema)
df_coal_prod.show(5)
+----+-----------+----+----+----------+-----------+
|Mode|     Entity|Code|Year|Production|Consumption|
+----+-----------+----+----+----------+-----------+
|   I|Afghanistan| AFG|1949|      0.04|       0.00|
|   I|Afghanistan| AFG|1950|      0.11|       0.00|
|   I|Afghanistan| AFG|1951|      0.12|       0.00|
|   I|Afghanistan| AFG|1952|      0.14|       0.00|
|   I|Afghanistan| AFG|1953|      0.13|       0.00|
+----+-----------+----+----+----------+-----------+
only showing top 5 rows

Apache Hudi需要一个主键来单独标识每个记录。如果是顺序生成的主键最佳,但是我们的表没有主键,为了解决这个问题,我们可以使用Entity和Year列的组合来生成PK,下面的键列将用作主键。

df_coal_prod=df_coal_prod.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_f=df_coal_prod.drop(df_coal_prod.Mode)
df_coal_prod_f.show(5)
+-----------+----+----+----------+-----------+---------------+
|     Entity|Code|Year|Production|Consumption|            key|
+-----------+----+----+----------+-----------+---------------+
|Afghanistan| AFG|1949|      0.04|       0.00|Afghanistan1949|
|Afghanistan| AFG|1950|      0.11|       0.00|Afghanistan1950|
|Afghanistan| AFG|1951|      0.12|       0.00|Afghanistan1951|
|Afghanistan| AFG|1952|      0.14|       0.00|Afghanistan1952|
|Afghanistan| AFG|1953|      0.13|       0.00|Afghanistan1953|
+-----------+----+----+----------+-----------+---------------+
only showing top 5 rows

我们现在准备以Hudi格式保存数据,因为这是我们第一次保存这个表,所以我们将使用bulk_insert操作和mode=overwrite。还要注意,我们使用"key"列作为recordkey

df_coal_prod_f.write.format("org.apache.hudi") \
            .option("hoodie.table.name", TABLE_NAME) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "bulk_insert") \
            .option("hoodie.datasource.write.recordkey.field","key") \
            .option("hoodie.datasource.write.precombine.field", "key") \
            .mode("overwrite") \
            .save(S3_HUDI_DATA)

我们现在可以读取新创建的Hudi表。

df_final = spark.read.format("org.apache.hudi")\
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2013'").show(5)
+--------+
|count(1)|
+--------+
|    6282|
+--------+
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|Entity|Code|Year|Production|Consumption|      key|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|     20201021215857|20201021215857_54...|         India2013|               default|8fae00ae-34e7-45e...| India| IND|2013|   2841.01|       0.00|India2013|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+

请注意,对于键India2013,我们有6282行初始化的数据,此键将在下一个操作中更新。现在我们将读取增量数据。

增量数据有4行:插入2行,更新一行,删除一行。我们将首先处理插入和更新的行,注意下面的过滤器("Mode IN('U','I')")。

S3_INCR_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/20200808-*.csv"
df_coal_prod_incr = spark.read.csv(S3_INCR_RAW_DATA, header=False, schema=coal_prod_schema)
df_coal_prod_incr_u_i=df_coal_prod_incr.filter("Mode IN ('U', 'I')")
df_coal_prod_incr_u_i=df_coal_prod_incr_u_i.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_incr_u_i.show(5)
df_coal_prod_incr_u_i_f=df_coal_prod_incr_u_i.drop(df_coal_prod_incr_u_i.Mode)
df_coal_prod_incr_u_i_f.show()
+----+------+----+----+----------+-----------+---------+
|Mode|Entity|Code|Year|Production|Consumption|      key|
+----+------+----+----+----------+-----------+---------+
|   I| India| IND|2015|   4056.33|       0.00|India2015|
|   I| India| IND|2016|   4890.45|       0.00|India2016|
|   U| India| IND|2013|   2845.66|     145.66|India2013|
+----+------+----+----+----------+-----------+---------+
+------+----+----+----------+-----------+---------+
|Entity|Code|Year|Production|Consumption|      key|
+------+----+----+----------+-----------+---------+
| India| IND|2015|   4056.33|       0.00|India2015|
| India| IND|2016|   4890.45|       0.00|India2016|
| India| IND|2013|   2845.66|     145.66|India2013|
+------+----+----+----------+-----------+---------+

现在准备好对增量数据执行Hudi Upsert操作,由于这个表已经存在,我们将使用append选项。

df_coal_prod_incr_u_i_f.write.format("org.apache.hudi") \
            .option("hoodie.table.name", TABLE_NAME) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "upsert") \
            .option("hoodie.upsert.shuffle.parallelism", 20) \
            .option("hoodie.datasource.write.recordkey.field","key") \
            .option("hoodie.datasource.write.precombine.field", "key") \
            .mode("append") \
            .save(S3_HUDI_DATA)

检查底层数据,注意,已经添加了两个新行,因此表计数从6282增加到6284,另外主键India2013的行已经更新了Production & Consumption列。

df_final = spark.read.format("org.apache.hudi")\
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2013'").show(5)
+--------+
|count(1)|
+--------+
|    6284|
+--------+
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|Entity|Code|Year|Production|Consumption|      key|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|     20201021220359|20201021220359_0_...|         India2013|               default|8fae00ae-34e7-45e...| India| IND|2013|   2845.66|     145.66|India2013|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+

接着处理删除

df_coal_prod_incr_d=df_coal_prod_incr.filter("Mode IN ('D')")
df_coal_prod_incr_d=df_coal_prod_incr_d.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_incr_d_f=df_coal_prod_incr_d.drop(df_coal_prod_incr_u_i.Mode)
df_coal_prod_incr_d_f.show()
+------+----+----+----------+-----------+---------+
|Entity|Code|Year|Production|Consumption|      key|
+------+----+----+----------+-----------+---------+
| India| IND|2010|   2710.54|       0.00|India2010|
+------+----+----+----------+-----------+---------+

我们可以使用Hudi Upsert操作来完成此操作,但需要使用和额外的删除选项hudi.datasource.write.payload.class=org.apache.hudi.EmptyHoodieRecordPayload

df_coal_prod_incr_d_f.write.format("org.apache.hudi") \
            .option("hoodie.table.name", TABLE_NAME) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "upsert") \
            .option("hoodie.upsert.shuffle.parallelism", 20) \
            .option("hoodie.datasource.write.recordkey.field","key") \
            .option("hoodie.datasource.write.precombine.field", "key") \
            .option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload") \
            .mode("append") \
            .save(S3_HUDI_DATA)

检查结果删除结果,由于删除了一行,计数从6284下降到6283,另外对已删除行的查询返回为空。

df_final = spark.read.format("org.apache.hudi")\
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2010'").show(5)
+--------+
|count(1)|
+--------+
|    6283|
+--------+
+-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|Entity|Code|Year|Production|Consumption|key|
+-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+
+-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+


相关实践学习
MySQL基础-学生管理系统数据库设计
本场景介绍如何使用DMS工具连接RDS,并使用DMS图形化工具创建数据库表。
目录
相关文章
|
2月前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
57 1
|
2月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
28天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
78 11
|
5月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
17757 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
2月前
|
存储 缓存 数据管理
阿里云EMR数据湖文件系统问题之JindoFS数据孤岛的问题如何解决
阿里云EMR数据湖文件系统问题之JindoFS数据孤岛的问题如何解决
|
2月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
43 0
|
2月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
47 0
|
3月前
|
DataWorks 安全 API
DataWorks产品使用合集之是否可以不使用DataWorks进行EMR的调度和DataX数据导入
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
监控 大数据 API
震撼来袭!Apache Flink:实时数据流处理界的超级巨星,开启全新纪元,让你的数据飞起来!
【8月更文挑战第6天】随着大数据时代的到来,企业急需高效处理实时数据流。Apache Flink作为一款开源流处理框架,以高性能、可靠性及易用性脱颖而出。Flink能无缝处理有界和无界数据流,支持低延迟实时分析,适用于实时推荐、监控及风控等场景。例如,在实时风控系统中,Flink可即时分析交易行为以检测欺诈。以下示例展示了如何使用Flink实时计算交易总额,通过定义Transaction类和使用DataStream API实现数据流的实时处理和聚合。Flink正以其强大的实时处理能力和高度可扩展性引领实时数据流处理的新时代。
51 0

推荐镜像

更多
下一篇
无影云桌面