Apache Hudi助力nClouds加速数据交付

简介: 在nClouds上,当客户的业务决策取决于对近实时数据的访问时,客户通常会向我们寻求有关数据和分析平台的解决方案。但随着每天创建和收集的数据量都在增加,这使得使用传统技术进行数据分析成为一项艰巨的任务。本文我们将讨论nClouds如何帮助您应对数据延迟,数据质量,系统可靠性和数据隐私合规性方面的挑战。

1. 概述


nClouds上,当客户的业务决策取决于对近实时数据的访问时,客户通常会向我们寻求有关数据和分析平台的解决方案。但随着每天创建和收集的数据量都在增加,这使得使用传统技术进行数据分析成为一项艰巨的任务。

本文我们将讨论nClouds如何帮助您应对数据延迟,数据质量,系统可靠性和数据隐私合规性方面的挑战。

Amazon EMR上的Apache Hudi是需要构建增量数据管道、大规模近实时处理数据的理想解决方案。本篇文章将在Amazon EMR的Apache Hudi上进行原型验证。

nClouds是具有DevOps、数据分析和迁移能力的AWS高级咨询合作伙伴 ,并且是AWS托管服务提供商(MSP)和AWS完善的合作伙伴计划的成员。nClouds的使命是帮助您构建和管理可更快交付创新的现代基础架构解决方案。


2. 解决方案概述


Apache Hudi是一个开源数据管理框架,用于简化增量数据处理和数据管道开发。它最初于2016年在Uber开发,旨在为PB级数据分析提供更快的数据,进行低延迟、高效率的数据摄取。

Apache Hudi通常用于简化进入数据湖和分析服务的数据管道,支持记录级粒度的Change Data Capture(CDC),同时可通过Apache Hive和Presto之类的SQL查询引擎对数据集进行近乎实时的分析,更多关于Hudi详情,可访问hudi.apache.org

Amazon EMR是领先的云大数据平台,可使用开源工具(例如Apache Hudi,Apache Spark,Apache Hive,Apache HBase,Apache Flink和Presto)处理大量数据。当选择Spark,Hive或Presto作为部署选项时,Apache Hudi会自动安装在Amazon EMR集群中。

在2019年,Amazon EMR团队开始与Apache Hudi社区密切合作,以提供补丁和bug修复并添加对AWS Glue Data Catalog的支持。

Apache Hudi非常适合将数据快速提取到Hadoop分布式文件系统(HDFS)或云存储中,并加快ETL/Hive/ Spark作业,Hudi适用于读繁重或写繁重的场景,它可以管理存储在Amazon Simple Storage Service(Amazon S3)上的数据。


2.1 数据延迟

高数据延迟会影响客户的运营能力,进一步影响新产品和服务的快速开发和交付,盈利能力以及基于事实的决策。

在上述场景下,我们建议使用Apache Hudi,它提供了DeltaStreamer实用工具程序来执行自动增量更新处理,使得关键业务数据管道能够以接近实时的延迟实现高效摄取,每次查询表时,都可以读取这些增量文件。

Apache Hudi通过处理对近实时数据的查询以及增量拉取进行时间点数据分析的查询来节省时间。


2.2 数据质量

数据量不断增长可能会对数据质量判断造成困难。从海量、可变和复杂的数据集中提取高质量的数据非常困难,尤其是在混合了非结构化,半结构化和结构化数据的情况下。

当数据快速变化时,其质量取决于其时效性,Apache Hudi能够处理数据结构变更,自动执行增量数据更新以及有效地提取流数据的能力,有助于提取和集成高质量数据。

Apache Hudi可与Amazon Simple Workflow(Amazon SWF)AWS Data PipelineAWS Lambda等AWS服务集成以实现自动化实时数据湖工作流程。


2.3 系统可靠性

当我们执行AWS Well-Architected Review(使用AWS Well-Architected Framework的最佳实践进行架构评估)时,我们关注的核心点之一是架构可靠性。如果通过临时提取,转换,加载(ETL)作业提取数据,而没有可靠的架构通信机制,则系统可靠性可能会受到威胁。

我们喜欢Apache Hudi在数据湖中控制和管理文件布局的功能,此功能对于维护健康的数据生态系统至关重要,因为它提高了可靠性和查询性能。

使用Hudi,用户无需加载新数据并使用ETL清理数据,从之前数据层摄取的数据和变更会自动更新,并在保存新数据时触发自动化的工作流程。

然后在AWS数据库迁移服务(AWS DMS)注册更新,并在Amazon Simple Storage Service(Amazon S3)的源位置中创建一个Apache Parquet文件,它使用Apache Avro作为记录的内部规范表示,从而在数据提取或ETL管道中提供可靠性。


2.4 遵守数据隐私法规

Apache Hudi管理着数据湖中数据的所有交互,并且提供对数据的访问的服务,同时Apache Hudi使得基于Amazon S3的数据湖能够遵守数据隐私法,其提供了记录级的更新和删除,因此用户可以选择行使其被遗忘的权利或更改其有关如何使用其数据的同意。


3. 原型验证


在nClouds,我们构建了一个非面向客户的原型验证(PoC)以说明如何使用Hudi的插入、更新和删除操作来处理数据集中的更改,COVID-19的经济影响促使我们使用与COVID-19大流行相关的数据。

TDWI最近的一项研究发现,由于大流行的影响,超过一半的数据和分析专业人员被要求回答新类型的问题,约三分之一的受访者表示,他们需要更新模型和分析负载以通过重新训练模型或重塑客户群来应对不断变化的客户行为。

我们PoC的数据流为Amazon Relational Database Service(Amazon RDS)-> Amazon S3记录集更改 -> Hudi数据集,以快速应用增量更改。同时我们需要一个环境来运行我们的测试,包括Amazon RDS,AWS DMS任务,Amazon EMR集群和S3存储桶,最后一步做数据可视化,我们使用Amazon QuickSight展示报表。

30.jpg

下面是PoC方案的每个具体步骤

第一步:Amazon RDS设置

  • 在Amazon RDS仪表板中,跳转到数据库参数组部分,然后创建一个新的参数组,将binlog格式设置为ROW。
  • 创建一个新的Amazon RDS实例,确保使用设置的DB Parameter group并开启自动备份;
  • 创建完成后,进行连接并创建如下MySQL Schema
CREATE TABLE covid_by_state(
  covid_by_state_id INTEGER NOT NULL AUTO_INCREMENT,
  date TIMESTAMP DEFAULT NOW() ON UPDATE NOW(),
  state VARCHAR(100),
  fips INTEGER,
  cases INTEGER,
  deaths INTEGER,
  CONSTRAINT orders_pk PRIMARY KEY(covid_by_state_id)
);
INSERT INTO covid_by_state( date , state, fips, cases, deaths) VALUES('2020-01-21','Washington',53,1,0);
INSERT INTO covid_by_state( date , state, fips, cases, deaths) VALUES('2020-01-21','Illinois',17,1,0);

第二步:AWS DMS设置

下一步需要使用AWS DMS将数据复制到Amazon S3中,需要一个复制实例来运行测试和复制任务

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": [
        "s3:DeleteObjectTagging",
        "s3:PutObject",
        "s3:GetObject",
        "s3:GetObjectTagging",
        "s3:PutObjectTagging",
        "s3:DeleteObject"
      ],
    "Resource": "arn:aws:s3:::bigdatablueprint-role/*"
    }
  ]
}

31.jpg

  • 检查是否正常工作,下图展示了AWS DMS成功运行的结果

32.jpg

第三步:AWS DMS任务设置

第四步:Amazon EMR集群设置

安装Apache Spark,Apache Hive或Presto时,Amazon EMR(发行版5.28.0及更高版本)会默认安装Apache Hudi组件。

Apache Spark或Apache Hudi DeltaStreamer实用程序可以创建或更新Apache Hudi数据集。Apache Hive,Apache Spark或Presto可以交互式查询Apache Hudi数据集,或使用增量拉取(仅拉取两次操作之间更改的数据)来构建数据处理管道。

以下是有关如何使用Apache Hudi运行新的Amazon EMR集群和处理数据的教程。

  • 进入Amazon EMR控制面板
  • 填写如下配置

33.jpg

  • 集群运行后,执行以下脚本来设置数据库。运行Apache Spark命令以查看两行初始插入的当前数据库状态。
scala>scala> spark.read.parquet("s3://bigdatablueprint-raw/covid/hudi_dms/covid_by_state/*").sort("updated_at").show
+----+--------+---------+-------------+-------------------+-------------------+
+----+-----------------+-------------------+----------+----+-----+------+    
|  Op|covid_by_state_id|            date|   state|fips|cases|deaths|
+----+-----------------+-------------------+----------+----+-----+------+
|null|              1|2020-01-21 00:00:00|Washington|  53|  1|  0|
|null|              2|2020-01-21 00:00:00|  Illinois|  17|  1|  0|

第五步:使用Apache Hudi DeltaStreamer处理变更日志

  • 要开始使用更改日志,请在工作流时间表上使用以Apache Spark作业运行的Apache Hudi DeltaStreamer。 例如,按如下所示启动Apache Spark Shell:
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer    --packages org.apache.spark:spark-avro_2.11:2.4.4   --master yarn --deploy-mode client   hudi-utilities-bundle_2.11-0.5.2-incubating.jar   --table-type 
COPY_ON_WRITE   --source-ordering-field cases   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource   --target-base-path s3://bigdatablueprint-final/hudi_covid --target-table cover   --transformer-class org.apache.hudi.utilities.transform.AWSDmsTrans
  • 这是Apache Hudi表,具有与上游表相同的记录(也包含所有_hoodie字段):
scala> spark.read.parquet("s3://bigdatablueprint-final/hudi_covid/*/*.parquet").sort("cases").show
+-------------------+--------------------+------------------+----------------------+--------------------+-----------------+-------------------+----------+----+-----+------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|covid_by_state_id|             date|   state|fips|cases|deaths| Op|
+-------------------+--------------------+------------------+----------------------+--------------------+-----------------+-------------------+----------+----+-----+------+---+
|   20200812061302|  20200812061302_0_1|              1|            default|0a292b18-5194-45d...|             1|2020-01-21 00:00:00|Washington|  53|  1|  0|   |
|   20200812061302|  20200812061302_0_2|              2|            default|0a292b18-5194-45d...|             2|2020-01-21 00:00:00|  Illinois|  17|  1|  0|   |
  • 进行数据库更改以查看其他行和更新的值。
  • 一旦在源Amazon S3存储桶更新后,AWS DMS将立即将摄取,然后重新运行相同的Apache Hudi作业,从而将新的Apache Parquet文件添加到AWS DMS输出文件夹。
scala> spark.read.parquet("s3://bigdatablueprint-raw/covid/hudi_dms/covid_by_state/*").sort("cases").show
+----+-----------------+-------------------+----------+----+-----+------+    
|  Op|covid_by_state_id|            date|   state|fips|cases|deaths|
+----+-----------------+-------------------+----------+----+-----+------+
|   I|              4|2020-01-21 00:00:00|   Arizona|   4|  1|  0|
|null|              1|2020-01-21 00:00:00|Washington|  53|  1|  0|
|null|              2|2020-01-21 00:00:00|  Illinois|  17|  1|  0|
|   U|              1|2020-08-12 06:25:04|Washington|  53|   60|  0|
+----+-----------------+-------------------+----------+----+-----+------+
  • 重新运行Apache Hudi DeltaStreamer命令时,它将把Apache Parquet文件变更成Apache Hudi表。
  • 这是运行Apache Spark作业后的Apache Hudi结果:
scala> spark.read.parquet("s3://bigdatablueprint-final/hudi_covid/*/*.parquet").sort("cases").show
+-------------------+--------------------+------------------+----------------------+--------------------+-----------------+-------------------+----------+----+-----+------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|covid_by_state_id|             date|   state|fips|cases|deaths| Op|
+-------------------+--------------------+------------------+----------------------+--------------------+-----------------+-------------------+----------+----+-----+------+---+
|   20200812061302|  20200812061302_0_1|              1|            default|0a292b18-5194-45d...|             1|2020-01-21 00:00:00|Washington|  53|  1|  0|   |
|   20200812061302|  20200812061302_0_2|              2|            default|0a292b18-5194-45d...|             2|2020-01-21 00:00:00|  Illinois|  17|  1|  0|   |
|   20200812063216|  20200812063216_0_1|              4|            default|0a292b18-5194-45d...|             4|2020-01-21 00:00:00|   Arizona|   4|  1|  0|  I|
|   20200812062710|  20200812062710_0_1|              1|            default|0a292b18-5194-45d...|             1|2020-08-12 06:25:04|Washington|  53|   60|  0|  U|
  • 将数据转化为CSV格式以便可以通过Amazon QuickSight读取
var df = spark.read.parquet("s3://bigdatablueprint-final/hudi_covid/*/*.parquet").sort("cases")
df.write.option("header","true").csv("covid.csv")

使用上述命令将数据转换为.csv后,它是一种便于Amazon QuickSight查看的数据格式,可以将其可视化。下图显示了来自QuickSight中原始(.csv)源的数据的一种表示形式。

这种数据视图按状态细分了特定日期的案例数。随着Hudi使用新数据(可以直接流式传输)更新表时,将启用通过QuickSight进行近实时或实时报告。

34.jpg


4. 总结


在本文中,我们逐步介绍了我们的非面向客户的PoC解决方案,以在Amazon EMR和其他托管服务(包括用于数据可视化的Amazon QuickSight)上使用Apache Hudi建立新的数据和分析平台。

如果您的业务决策取决于对近实时数据的访问,并且您面临数据延迟,高数据质量,系统可靠性以及对数据隐私法规的遵从性等挑战,我们建议从nClouds实施此解决方案,它旨在加速需要增量数据管道和处理的大规模和近实时应用程序中的数据交付。

目录
相关文章
|
27天前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
42 1
|
27天前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
13天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
58 11
|
1月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
38 0
|
1月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
46 0
|
1月前
|
监控 大数据 API
震撼来袭!Apache Flink:实时数据流处理界的超级巨星,开启全新纪元,让你的数据飞起来!
【8月更文挑战第6天】随着大数据时代的到来,企业急需高效处理实时数据流。Apache Flink作为一款开源流处理框架,以高性能、可靠性及易用性脱颖而出。Flink能无缝处理有界和无界数据流,支持低延迟实时分析,适用于实时推荐、监控及风控等场景。例如,在实时风控系统中,Flink可即时分析交易行为以检测欺诈。以下示例展示了如何使用Flink实时计算交易总额,通过定义Transaction类和使用DataStream API实现数据流的实时处理和聚合。Flink正以其强大的实时处理能力和高度可扩展性引领实时数据流处理的新时代。
50 0
|
2月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
3月前
|
数据采集 关系型数据库 MySQL
使用Apache Flink实现MySQL数据读取和写入的完整指南
使用Apache Flink实现MySQL数据读取和写入的完整指南
361 0
使用Apache Flink实现MySQL数据读取和写入的完整指南
|
4月前
|
OLAP 数据处理 Apache
众安保险 CDP 平台:借助阿里云数据库 SelectDB 版内核 Apache Doris 打破数据孤岛,人群圈选提速4倍
众安保险在CDP(Customer Data Platform,客户数据平台)建设中,通过引入阿里云数据库SelectDB版内核Apache Doris,成功打破了数据孤岛,并显著提升了人群圈选的速度
246 1
|
3月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

推荐镜像

更多