基于Apache Hudi和Debezium构建CDC入湖管道

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: 从 Hudi v0.10.0 开始,我们很高兴地宣布推出适用于 Deltastreamer 的 Debezium 源,它提供从 Postgres 和 MySQL 数据库到数据湖的变更捕获数据 (CDC) 的摄取。

1. 背景


50.png

当想要对来自事务数据库(如 Postgres 或 MySQL)的数据执行分析时,通常需要通过称为更改数据捕获 CDC的过程将此数据引入数据仓库或数据湖等 OLAP 系统。 Debezium 是一种流行的工具,它使 CDC 变得简单,其提供了一种通过读取更改日志来捕获数据库中行级更改的方法,通过这种方式 Debezium 可以避免增加数据库上的 CPU 负载,并确保捕获包括删除在内的所有变更。

现在 Apache Hudi 提供了 Debezium 源连接器,CDC 引入数据湖比以往任何时候都更容易,因为它具有一些独特的差异化功能。 Hudi 可在数据湖上实现高效的更新、合并和删除事务。 Hudi 独特地提供了 Merge-On-Read 写入器,与使用 Spark 或 Flink 的典型数据湖写入器相比,该写入器可以显着降低摄取延迟。 最后,Apache Hudi 提供增量查询,因此在从数据库中捕获更改后可以在所有后续 ETL 管道中以增量方式处理这些更改下游。


2. 总体设计


51.png

上面显示了使用 Apache Hudi 的端到端 CDC 摄取流的架构,第一个组件是 Debezium 部署,它由 Kafka 集群、schema registry(Confluent 或 Apicurio)和 Debezium 连接器组成,Debezium 连接器不断轮询数据库中的更改日志,并将每个数据库行的更改写入 AVRO 消息到每个表的专用 Kafka 主题。

第二个组件是 Hudi Deltastreamer,它为每个表从 Kafka 读取和处理传入的 Debezium 记录,并在云存储上的 Hudi 表中写入(更新)相应的行。

为了近乎实时地将数据库表中的数据提取到 Hudi 表中,我们实现了两个可插拔的 Deltastreamer 类。首先我们实现了一个 Debezium 源。 Deltastreamer 在连续模式下运行,源源不断地从给定表的 Kafka 主题中读取和处理 Avro 格式的 Debezium 更改记录,并将更新的记录写入目标 Hudi 表。 除了数据库表中的列之外,我们还摄取了一些由 Debezium 添加到目标 Hudi 表中的元字段,元字段帮助我们正确地合并更新和删除记录,使用Schema Registry表中的最新模式读取记录。

其次我们实现了一个自定义的 Debezium Payload,它控制了在更新或删除同一行时如何合并 Hudi 记录,当接收到现有行的新 Hudi 记录时,有效负载使用相应列的较高值(MySQL 中的 FILEID 和 POS 字段以及 Postgres 中的 LSN 字段)选择最新记录,在后一个事件是删除记录的情况下,有效负载实现确保从存储中硬删除记录。 删除记录使用 op 字段标识,该字段的值 d 表示删除。


3. Apache Hudi配置


在使用 Debezium 源连接器进行 CDC 摄取时,请务必考虑以下 Hudi 部署配置。

  • 记录键 - 表的 Hudi 记录键应设置为上游数据库中表的主键。这可确保正确应用更新,因为记录键唯一地标识 Hudi 表中的一行。
  • 源排序字段 - 对于更改日志记录的重复数据删除,源排序字段应设置为数据库上发生的更改事件的实际位置。 例如我们分别使用 MySQL 中的 FILEID 和 POS 字段以及 Postgres 数据库中的 LSN 字段来确保记录在原始数据库中以正确的出现顺序进行处理。
  • 分区字段 - 不要将 Hudi 表的分区与与上游数据库相同的分区字段相匹配。当然也可以根据需要为 Hudi 表单独设置分区字段。


3.1 引导现有表

一个重要的用例可能是必须对现有数据库表进行 CDC 摄取。在流式传输更改之前我们可以通过两种方式获取现有数据库数据:

  • 默认情况下,Debezium 在初始化时执行数据库的初始一致快照(由 config snapshot.mode 控制)。在初始快照之后它会继续从正确的位置流式传输更新以避免数据丢失。
  • 虽然第一种方法很简单,但对于大型表,Debezium 引导初始快照可能需要很长时间。或者我们可以运行 Deltastreamer 作业,使用 JDBC 源直接从数据库引导表,这为用户定义和执行引导数据库表所需的更优化的 SQL 查询提供了更大的灵活性。引导作业成功完成后,将执行另一个 Deltastreamer 作业,处理来自 Debezium 的数据库更改日志,用户必须在 Deltastreamer 中使用检查点来确保第二个作业从正确的位置开始处理变更日志,以避免数据丢失。


3.2 例子

以下描述了使用 AWS RDS 实例 Postgres、基于 Kubernetes 的 Debezium 部署和在 Spark 集群上运行的 Hudi Deltastreamer 实施端到端 CDC 管道的步骤。


3.3 数据库

RDS 实例需要进行一些配置更改才能启用逻辑复制。

SET rds.logical_replication to 1 (instead of 0)
psql --host=<aws_rds_instance> --port=5432 --username=postgres --password -d <database_name>;
CREATE PUBLICATION <publication_name> FOR TABLE schema1.table1, schema1.table2;
ALTER TABLE schema1.table1 REPLICA IDENTITY FULL;


3.4 Debezium 连接器

Strimzi 是在 Kubernetes 集群上部署和管理 Kafka 连接器的推荐选项,或者可以选择使用 Confluent 托管的 Debezium 连接器

kubectl create namespace kafka
kubectl create -f https://strimzi.io/install/latest?namespace=kafka -n kafka
kubectl -n kafka apply -f kafka-connector.yaml

kafka-connector.yaml 的示例如下所示:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-kafka-connect
annotations:
strimzi.io/use-connector-resources: "false"
spec:
image: debezium-kafka-connect:latest
replicas: 1
bootstrapServers: localhost:9092
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1

可以使用以下包含 Postgres Debezium 连接器的 Dockerfile 构建 docker 映像 debezium-kafka-connect

FROM confluentinc/cp-kafka-connect:6.2.0 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.0
FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
RUN yum -y update
RUN yum -y install git
RUN yum -y install wget
RUN wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.1.Final/debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN tar xzf debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN mkdir -p /opt/kafka/plugins/debezium && mkdir -p /opt/kafka/plugins/avro/
RUN mv debezium-connector-postgres /opt/kafka/plugins/debezium/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/
USER 1001

一旦部署了 Strimzi 运算符和 Kafka 连接器,我们就可以启动 Debezium 连接器。

curl -X POST -H "Content-Type:application/json" -d @connect-source.json http://localhost:8083/connectors/

以下是设置 Debezium 连接器以生成两个表 table1 和 table2 的更改日志的配置示例。

connect-source.json 的内容如下

{
  "name": "postgres-debezium-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "database",
    "plugin.name": "pgoutput",
    "database.server.name": "postgres",
    "table.include.list": "schema1.table1,schema1.table2",
    "publication.autocreate.mode": "filtered",
    "tombstones.on.delete":"false",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "<schema_registry_host>",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "<schema_registry_host>",
    "slot.name": "pgslot"
  }
}


3.5 Hudi Deltastreamer

接下来我们使用 Spark 运行 Hudi Deltastreamer,它将从 kafka 摄取 Debezium 变更日志并将它们写入 Hudi 表。 下面显示了一个这样的命令实例,它适用于 Postgres 数据库。 几个关键配置如下:

  • 将源类设置为 PostgresDebeziumSource。
  • 将有效负载类设置为 PostgresDebeziumAvroPayload。
  • 为 Debezium Source 和 Kafka Source 配置模式注册表 URL。
  • 将记录键设置为数据库表的主键。
  • 将源排序字段 (dedup) 设置为 _event_lsn
spark-submit \\
  --jars "/home/hadoop/hudi-utilities-bundle_2.12-0.10.0.jar,/usr/lib/spark/external/lib/spark-avro.jar" \\
  --master yarn --deploy-mode client \\
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.0-SNAPSHOT.jar \\
  --table-type COPY_ON_WRITE --op UPSERT \\
  --target-base-path s3://bucket_name/path/for/hudi_table1 \\
  --target-table hudi_table1  --continuous \\
  --min-sync-interval-seconds 60 \\
  --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \\
  --source-ordering-field _event_lsn \\
  --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \\
  --hoodie-conf schema.registry.url=https://localhost:8081 \\
  --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://localhost:8081/subjects/postgres.schema1.table1-value/versions/latest \\
  --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \\
  --hoodie-conf hoodie.deltastreamer.source.kafka.topic=postgres.schema1.table1 \\
  --hoodie-conf auto.offset.reset=earliest \\
  --hoodie-conf hoodie.datasource.write.recordkey.field=”database_primary_key” \\
  --hoodie-conf hoodie.datasource.write.partitionpath.field=partition_key \\
  --enable-hive-sync \\
  --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \\
  --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \\
  --hoodie-conf hoodie.datasource.hive_sync.database=default \\
  --hoodie-conf hoodie.datasource.hive_sync.table=hudi_table1 \\
  --hoodie-conf hoodie.datasource.hive_sync.partition_fields=partition_key


4. 总结


这篇文章介绍了用于 Hudi Deltastreamer 的 Debezium 源,以将 Debezium 更改日志提取到 Hudi 表中。 现在可以将数据库数据提取到数据湖中,以提供一种经济高效的方式来存储和分析数据库数据。

请关注此 JIRA 以了解有关此新功能的更多信息。

目录
相关文章
|
21天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
79 2
|
29天前
|
Java 持续交付 项目管理
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。它采用项目对象模型(POM)来描述项目,简化构建流程。Maven提供依赖管理、标准构建生命周期、插件扩展等功能,支持多模块项目及版本控制。在Java Web开发中,Maven能够自动生成项目结构、管理依赖、自动化构建流程并运行多种插件任务,如代码质量检查和单元测试。遵循Maven的最佳实践,结合持续集成工具,可以显著提升开发效率和项目质量。
38 1
|
2月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
17705 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
2月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
3月前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【6月更文挑战第30天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送序列化消息到主题,消费者通过订阅和跟踪偏移量消费消息。Kafka以持久化、容灾和顺序写入优化I/O。Java示例代码展示了如何创建并发送/接收消息。通过分区、消费者组和压缩等策略,Kafka在高并发场景下可被优化。
104 1
|
3月前
|
SQL 存储 运维
网易游戏如何基于阿里云瑶池数据库 SelectDB 内核 Apache Doris 构建全新湖仓一体架构
随着网易游戏品类及产品的快速发展,游戏数据分析场景面临着越来越多的挑战,为了保证系统性能和 SLA,要求引入新的组件来解决特定业务场景问题。为此,网易游戏引入 Apache Doris 构建了全新的湖仓一体架构。经过不断地扩张,目前已发展至十余集群、为内部上百个项目提供了稳定可靠的数据服务、日均查询量数百万次,整体查询性能得到 10-20 倍提升。
网易游戏如何基于阿里云瑶池数据库 SelectDB 内核 Apache Doris 构建全新湖仓一体架构
|
4月前
|
消息中间件 关系型数据库 MySQL
Apache Flink CDC 3.1.0 发布公告
Apache Flink 社区很高兴地宣布发布 Flink CDC 3.1.0!
903 1
Apache Flink CDC 3.1.0 发布公告
|
3月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
应用服务中间件 网络安全 Apache
构建高性能Web服务器:Nginx vs Apache
【5月更文挑战第16天】Nginx与Apache是两种主流Web服务器,各具优势。Nginx以其轻量级、高并发处理能力和反向代理功能见长,适合大型网站和高并发场景;而Apache以功能丰富、稳定性强闻名,适合企业网站和需要多种Web服务功能的场景。在性能上,Nginx处理高并发更优,Apache则可能在高负载时遭遇瓶颈。在选择时,应根据实际需求权衡。
|
23天前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
31 1

推荐镜像

更多