基于Apache Hudi在Google云构建数据湖平台

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 自从计算机出现以来,我们一直在尝试寻找计算机存储一些信息的方法,存储在计算机上的信息(也称为数据)有多种形式,数据变得如此重要,以至于信息现在已成为触手可及的商品。多年来数据以多种方式存储在计算机中,包括数据库、blob存储和其他方法,为了进行有效的业务分析,必须对现代应用程序创建的数据进行处理和分析,并且产生的数据量非常巨大!有效地存储数PB数据并拥有必要的工具来查询它以便使用它至关重要,只有这样对该数据的分析才能产生有意义的结果。

自从计算机出现以来,我们一直在尝试寻找计算机存储一些信息的方法,存储在计算机上的信息(也称为数据)有多种形式,数据变得如此重要,以至于信息现在已成为触手可及的商品。多年来数据以多种方式存储在计算机中,包括数据库、blob存储和其他方法,为了进行有效的业务分析,必须对现代应用程序创建的数据进行处理和分析,并且产生的数据量非常巨大!有效地存储数PB数据并拥有必要的工具来查询它以便使用它至关重要,只有这样对该数据的分析才能产生有意义的结果。

大数据是一门处理分析方法、有条不紊地从中提取信息或以其他方式处理对于典型数据处理应用程序软件而言过于庞大或复杂的数据量的方法的学科。为了处理现代应用程序产生的数据,大数据的应用是非常必要的,考虑到这一点,本博客旨在提供一个关于如何创建数据湖的小教程,该数据湖从应用程序的数据库中读取任何更改并将其写入数据湖中的相关位置,我们将为此使用的工具如下:

  • Debezium
  • MySQL
  • Apache Kafka
  • Apache Hudi
  • Apache Spark

我们将要构建的数据湖架构如下:

55.png

第一步是使用 Debezium 读取关系数据库中发生的所有更改,并将所有更改推送到 Kafka 集群。

Debezium 是一个用于变更数据捕获的开源分布式平台,Debezium 可以指向任何关系数据库,并且它可以开始实时捕获任何数据更改,它非常快速且实用,由红帽维护。

首先,我们将使用 docker-compose 在我们的机器上设置 Debezium、MySQL 和 Kafka,您也可以使用这些的独立安装,我们将使用 Debezium 提供给我们的 mysql 镜像,因为其中已经包含数据,在任何生产环境中都可以使用适当的 Kafka、MySQL 和 Debezium 集群,docker compose 文件如下:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3307:3306
    environment:
     - MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASS}
     - MYSQL_USER=${MYSQL_USER}
     - MYSQL_PASSWORD=${MYSQL_USER_PASS}
  schema-registry:
    image: confluentinc/cp-schema-registry
    ports:
     - 8181:8181
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    links:
     - zookeeper
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
     - schema-registry
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
     - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
     - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
     - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
     - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081

DEBEZIUM_VERSION 可以设置为 1.8。 此外请确保设置 MYSQL_ROOT_PASS、MYSQL_USER 和 MYSQL_PASSWORD。

在我们继续之前,我们将查看 debezium 镜像提供给我们的数据库 inventory 的结构,进入数据库的命令行:

docker-compose -f docker-compose-avro-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

在 shell 内部,我们可以使用 show tables 命令。 输出应该是这样的:

56.png

我们可以通过 select * from customers 命令来查看客户表的内容。 输出应该是这样的:

57.png

现在在创建容器后,我们将能够为 Kafka Connect 激活 Debezium 源连接器,我们将使用的数据格式是 Avro 数据格式,Avro 是在 Apache 的 Hadoop 项目中开发的面向行的远程过程调用和数据序列化框架。它使用 JSON 来定义数据类型和协议,并以紧凑的二进制格式序列化数据。

让我们用我们的 Debezium 连接器的配置创建另一个文件。

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "MYSQL_USER",
        "database.password": "MYSQL_PASSWORD",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}

正如我们所看到的,我们已经在其中配置了数据库的详细信息以及要从中读取更改的数据库,确保将 MYSQL_USER 和 MYSQL_PASSWORD 的值更改为您之前配置的值,现在我们将运行一个命令在 Kafka Connect 中注册它,命令如下:

curl -i -X POST -H "Accept:application/json" -H "Content-type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

现在,Debezium 应该能够从 Kafka 读取数据库更改。

下一步涉及使用 Spark 和 Hudi 从 Kafka 读取数据,并将它们以 Hudi 文件格式放入 Google Cloud Storage Bucket。 在我们开始使用它们之前,让我们了解一下 Hudi 和 Spark 是什么。

Apache Hudi 是一个开源数据管理框架,用于简化增量数据处理和数据管道开发。 该框架更有效地管理数据生命周期等业务需求并提高数据质量。 Hudi 使您能够在基于云的数据湖上管理记录级别的数据,以简化更改数据捕获 (CDC) 和流式数据摄取,并帮助处理需要记录级别更新和删除的数据隐私用例。 Hudi 管理的数据集使用开放存储格式存储在云存储桶中,而与 Presto、Apache Hive 和/或 Apache Spark 的集成使用熟悉的工具提供近乎实时的更新数据访问

Apache Spark 是用于大规模数据处理的开源统一分析引擎。 Spark 为具有隐式数据并行性和容错性的集群编程提供了一个接口。 Spark 代码库最初是在加州大学伯克利分校的 AMPLab 开发的,后来被捐赠给了 Apache 软件基金会,该基金会一直在维护它。

现在,由于我们正在 Google Cloud 上构建解决方案,因此最好的方法是使用 Google Cloud Dataproc。 Google Cloud Dataproc 是一种托管服务,用于处理大型数据集,例如大数据计划中使用的数据集。 Dataproc 是 Google 的公共云产品 Google Cloud Platform 的一部分。 Dataproc 帮助用户处理、转换和理解大量数据。

在 Google Dataproc 实例中,预装了 Spark 和所有必需的库。 创建实例后,我们可以在其中运行以下 Spark 作业来完成我们的管道:

spark-submit \
  --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 \
  --master yarn --deploy-mode client \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.1.jar \
  --table-type COPY_ON_WRITE --op UPSERT \
  --target-base-path gs://your-data-lake-bucket/hudi/customers \
  --target-table hudi_customers --continuous \
  --min-sync-interval-seconds 60 \
  --source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \
  --source-ordering-field _event_origin_ts_ms \
  --hoodie-conf schema.registry.url=http://localhost:8081 \
  --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest \
  --hoodie-conf hoodie.deltastreamer.source.kafka.topic=dbserver1.inventory.customers \
  --hoodie-conf bootstrap.servers=localhost:9092 \
  --hoodie-conf auto.offset.reset=earliest \
  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
  --hoodie-conf hoodie.datasource.write.partitionpath.field=id \

这将运行一个 spark 作业,该作业从我们之前推送到的 Kafka 中获取数据并将其写入 Google Cloud Storage Bucket。 我们必须指定 Kafka 主题、Schema Registry URL 和其他相关配置。


结论


可以通过多种方式构建数据湖。 我试图展示如何使用 Debezium、Kafka、Hudi、Spark 和 Google Cloud 构建数据湖。 使用这样的设置,可以轻松扩展管道以管理大量数据工作负载! 有关每种技术的更多详细信息,可以访问文档。 可以自定义 Spark 作业以获得更细粒度的控制。 这里显示的 Hudi 也可以与 Presto、Hive 或 Trino 集成。 定制的数量是无穷无尽的。 本文提供了有关如何使用上述工具构建基本数据管道的基本介绍!

目录
相关文章
|
2月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
64 0
|
19天前
|
存储 分布式计算 OLAP
Apache Paimon统一大数据湖存储底座
Apache Paimon,始于Flink Table Store,发展为独立的Apache顶级项目,专注流式数据湖存储。它提供统一存储底座,支持流、批、OLAP,优化了CDC入湖、流式链路构建和极速OLAP查询。Paimon社区快速增长,集成Flink、Spark等计算引擎,阿里巴巴在内部广泛应用,旨在打造统一湖存储,打通Serverless Flink、MaxCompute等,欢迎大家扫码参与体验阿里云上的 Flink+Paimon 的流批一体服务。
13419 0
Apache Paimon统一大数据湖存储底座
|
7天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【6月更文挑战第30天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送序列化消息到主题,消费者通过订阅和跟踪偏移量消费消息。Kafka以持久化、容灾和顺序写入优化I/O。Java示例代码展示了如何创建并发送/接收消息。通过分区、消费者组和压缩等策略,Kafka在高并发场景下可被优化。
17 1
|
9天前
|
SQL 存储 运维
网易游戏如何基于阿里云瑶池数据库 SelectDB 内核 Apache Doris 构建全新湖仓一体架构
随着网易游戏品类及产品的快速发展,游戏数据分析场景面临着越来越多的挑战,为了保证系统性能和 SLA,要求引入新的组件来解决特定业务场景问题。为此,网易游戏引入 Apache Doris 构建了全新的湖仓一体架构。经过不断地扩张,目前已发展至十余集群、为内部上百个项目提供了稳定可靠的数据服务、日均查询量数百万次,整体查询性能得到 10-20 倍提升。
网易游戏如何基于阿里云瑶池数据库 SelectDB 内核 Apache Doris 构建全新湖仓一体架构
|
1月前
|
人工智能 自然语言处理 机器人
[AI Google] 新的生成媒体模型和工具,专为创作者设计和构建
探索谷歌最新的生成媒体模型:用于高分辨率视频生成的 Veo 和用于卓越文本生成图像能力的 Imagen 3。还可以了解使用 Music AI Sandbox 创作的新演示录音。
[AI Google] 新的生成媒体模型和工具,专为创作者设计和构建
|
20天前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
应用服务中间件 网络安全 Apache
构建高性能Web服务器:Nginx vs Apache
【5月更文挑战第16天】Nginx与Apache是两种主流Web服务器,各具优势。Nginx以其轻量级、高并发处理能力和反向代理功能见长,适合大型网站和高并发场景;而Apache以功能丰富、稳定性强闻名,适合企业网站和需要多种Web服务功能的场景。在性能上,Nginx处理高并发更优,Apache则可能在高负载时遭遇瓶颈。在选择时,应根据实际需求权衡。
|
2月前
|
存储 分布式计算 Apache
官宣|Apache Paimon 毕业成为顶级项目,数据湖步入实时新篇章!
Apache Paimon 在构建实时数据湖与流批处理技术领域取得了重大突破,数据湖步入实时新篇章!
2567 6
官宣|Apache Paimon 毕业成为顶级项目,数据湖步入实时新篇章!
|
2月前
|
分布式计算 Java Hadoop
数据湖架构之Hudi编译篇
数据湖架构之Hudi编译篇
37 0
|
2月前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。

热门文章

最新文章

推荐镜像

更多