数据湖有新解!Apache Hudi 与 Apache Flink 集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 纵观大数据领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。

作者:王祥虎(Apache Hudi 社区)

Apache Hudi 是由 Uber 开发并开源的数据湖框架,它于 2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺利毕业晋升为 Apache 顶级项目。是当前最为热门的数据湖框架之一。

1. 为何要解耦

Hudi 自诞生至今一直使用 Spark 作为其数据处理引擎。如果用户想使用 Hudi 作为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,使用 Spark 作为大数据处理引擎可以说是很平常甚至是理所当然的事。因为 Spark 既可以进行批处理也可以使用微批模拟流,流批一体,一套引擎解决流、批问题。然而,近年来,随着大数据技术的发展,同为大数据处理引擎的 Flink 逐渐进入人们的视野,并在计算引擎领域获占据了一定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否支持使用 Flink 计算引擎的的声音开始逐渐出现,并日渐频繁。所以使 Hudi 支持 Flink 引擎是个有价值的事情,而集成 Flink 引擎的前提是 Hudi 与 Spark 解耦。

同时,纵观大数据领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。因此将 Hudi 与 Spark 解耦,将其变成一个引擎无关的数据湖框架,无疑是给 Hudi 与其他组件的融合创造了更多的可能,使得 Hudi 能更好的融入大数据生态圈。

2. 解耦难点

Hudi 内部使用 Spark API 像我们平时开发使用 List 一样稀松平常。自从数据源读取数据,到最终写出数据到表,无处不是使用 Spark RDD 作为主要数据结构,甚至连普通的工具类,都使用 Spark API 实现,可以说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定可谓是深入骨髓。

此外,此次解耦后集成的首要引擎是 Flink。而 Flink 与 Spark 在核心抽象上差异很大。Spark 认为数据是有界的,其核心抽象是一个有限的数据集合。而 Flink 则认为数据的本质是流,其核心抽象 DataStream 中包含的是各种对数据的操作。同时,Hudi 内部还存在多处同时操作多个 RDD,以及将一个 RDD 的处理结果与另一个 RDD 联合处理的情况,这种抽象上的区别以及实现时对于中间结果的复用,使得 Hudi 在解耦抽象上难以使用统一的 API 同时操作 RDD 和 DataStream。

3. 解耦思路

理论上,Hudi 使用 Spark 作为其计算引擎无非是为了使用 Spark 的分布式计算能力以及 RDD 丰富的算子能力。抛开分布式计算能力外,Hudi 更多是把 RDD 作为一个数据结构抽象,而 RDD 本质上又是一个有界数据集,因此,把 RDD 换成 List,在理论上完全可行(当然,可能会牺牲些性能)。为了尽可能保证 Hudi Spark 版本的性能和稳定性。我们可以保留将有界数据集作为基本操作单位的设定,Hudi 主要操作 API 不变,将 RDD 抽取为一个泛型,Spark 引擎实现仍旧使用 RDD,其他引擎则根据实际情况使用 List 或者其他有界数据集。

解耦原则:

1)统一泛型。Spark API 用到的 JavaRDD,JavaRDD,JavaRDD 统一使用泛型 I,K,O 代替;

2)去 Spark 化。抽象层所有 API 必须与 Spark 无关。涉及到具体操作难以在抽象层实现的,改写为抽象方法,引入 Spark 子类实现。

例如:Hudi 内部多处使用到了 JavaSparkContext#map() 方法,去 Spark 化,则需要将 JavaSparkContext 隐藏,针对该问题我们引入了 HoodieEngineContext#map() 方法,该方法会屏蔽 map 的具体实现细节,从而在抽象成实现去 Spark 化。

3)抽象层尽量减少改动,保证 Hudi 原版功能和性能;

4)使用 HoodieEngineContext 抽象类替换 JavaSparkContext,提供运行环境上下文。

4.Flink 集成设计

Hudi 的写操作在本质上是批处理,DeltaStreamer 的连续模式是通过循环进行批处理实现的。为使用统一 API,Hudi 集成 Flink 时选择攒一批数据后再进行处理,最后统一进行提交(这里 Flink 我们使用 List 来攒批数据)。

攒批操作最容易想到的是通过使用时间窗口来实现,然而,使用窗口,在某个窗口没有数据流入时,将没有输出数据,Sink 端难以判断同一批数据是否已经处理完。因此我们使用 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一个批次,当某个子任务中没有数据时,mock 结果数据凑数。这样在 Sink 端,当每个子任务都有结果数据下发时即可认为一批数据已经处理完成,可以执行 commit。

DAG 如下:

1.jpg

  • source 接收 Kafka 数据,转换成 List;
  • InstantGeneratorOperator 生成全局唯一的 instant.当上一个 instant 未完成或者当前批次无数据时,不创建新的 instant;
  • KeyBy partitionPath 根据 partitionPath 分区,避免多个子任务写同一个分区;
  • WriteProcessOperator 执行写操作,当当前分区无数据时,向下游发送空的结果数据凑数;
  • CommitSink 接收上游任务的计算结果,当收到 parallelism 个结果时,认为上游子任务全部执行完成,执行 commit.

注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其内部阻塞检查上一个 instant 的状态,保证全局只有一个 inflight(或 requested)状态的 instant.WriteProcessOperator 是实际执行写操作的地方,其写操作在 checkpoint 时触发。

5. 实现示例

1) HoodieTable

/**
 * Abstract implementation of a HoodieTable.
 *
 * @param <T> Sub type of HoodieRecordPayload
 * @param <I> Type of inputs
 * @param <K> Type of keys
 * @param <O> Type of outputs
 */
public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {

  protected final HoodieWriteConfig config;
  protected final HoodieTableMetaClient metaClient;
  protected final HoodieIndex<T, I, K, O> index;

  public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
      I records);

  public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
      I records);

  public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
      I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);

  ......
}

HoodieTable 是 Hudi 的核心抽象之一,其中定义了表支持的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.

从类注释可以看到 T,I,K,O 分别代表了 Hudi 操作的负载数据类型、输入数据类型、主键类型以及输出数据类型。这些泛型将贯穿整个抽象层。

2) HoodieEngineContext

/**
 * Base class contains the context information needed by the engine at runtime. It will be extended by different
 * engine implementation if needed.
 */
public abstract class HoodieEngineContext {

  public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);

  public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);

  public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);

  ......
}

HoodieEngineContext 扮演了 JavaSparkContext 的角色,它不仅能提供所有 JavaSparkContext 能提供的信息,还封装了 map,flatMap,foreach 等诸多方法,隐藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等方法的具体实现。

以 map 方法为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 方法如下:

@Override
  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
    return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
  }

在操作 List 的引擎中其实现可以为(不同方法需注意线程安全问题,慎用 parallel()):

@Override
  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
    return data.stream().parallel().map(func::apply).collect(Collectors.toList());
  }

注:map 函数中抛出的异常,可以通过包装 SerializableFunction func 解决.

这里简要介绍下 SerializableFunction:

@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {
  O apply(I v1) throws Exception;
}

该方法实际上是 java.util.function.Function 的变种,与java.util.function.Function 不同的是 SerializableFunction 可以序列化,可以抛异常。引入该函数是因为 JavaSparkContext#map() 函数能接收的入参必须可序列,同时在hudi的逻辑中,有多处需要抛异常,而在 Lambda 表达式中进行 try catch 代码会略显臃肿,不太优雅。

6.现状和后续计划

6.1 工作时间轴

2020 年 4 月,T3 出行(杨华@vinoyang,王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其他小伙伴一起设计、敲定了该解耦方案;

2020 年 4 月,T3 出行(王祥虎@wangxianghu)在内部完成了编码实现,并进行了初步验证,得出方案可行的结论;

2020 年 7 月,T3 出行(王祥虎@wangxianghu)将该设计实现和基于新抽象实现的 Spark 版本推向社区(HUDI-1089);

2020 年 9 月 26 日,顺丰科技基于 T3 内部分支修改完善的版本在 Apache Flink Meetup(深圳站)公开 PR, 使其成为业界第一个在线上使用 Flink 将数据写 Hudi 的企业。

2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分支,标志着 Hudi-Spark 解耦完成。

6.2 后续计划

1)推进 Hudi 和 Flink 集成

将 Flink 与 Hudi 的集成尽快推向社区,初期该特性可能只支持 Kafka 数据源。

2)性能优化

为保证 Hudi-Spark 版本的稳定性和性能,此次解耦没有太多考虑 Flink 版本可能存在的性能问题。

3)类 flink-connector-hudi 第三方包开发

将 Hudi-Flink 的绑定做成第三方包,用户可以在 Flink 应用中以编码方式读取任意数据源,通过这个第三方包写入 Hudi。

更多 Flink 技术交流可加入 Apache Flink 社区钉钉交流群:

最新钉群二维码.jpeg

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
18天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
307 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
871 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
103 3
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
69 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
52 1
|
3月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
87 1
|
3月前
|
Java 测试技术 API
如何在 Apache JMeter 中集成 Elastic APM
如何在 Apache JMeter 中集成 Elastic APM
49 1
|
3月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
209 0
|
3月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
99 0
|
消息中间件 SQL 数据采集
数仓实时化改造:Hudi on Flink 在顺丰的实践应用
本文主要介绍顺丰在数据仓库的数据实时化、数据库 CDC、Hudi on Flink 上的实践应用及产品化经验。文章主要分为以下几部分:1、顺丰业务介绍;2、Hudi on Flink;3、产品化支持;4、后续计划。
数仓实时化改造:Hudi on Flink 在顺丰的实践应用

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多