基于TIS构建Apache Hudi千表入湖方案

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 基于TIS构建Apache Hudi千表入湖方案

拥抱数据湖

随着大数据时代的到来,数据量动辄PB级,因此亟需一种低成本、高稳定性的实时数仓解决方案来支持海量数据的OLAP查询需求,Apache Hudi[1]应运而生。Hudi借助与存放在廉价的分布式文件系统之中列式存储文件,并将其元数据信息存放在Hive元数据库中与传统查询引擎Hive、Presto、Spark等整合,完美地实现了计算与存储的分离。Hudi数据湖方案比传统的Hive数仓的优势是加入了数据实时同步功能, 可以通过最新的Flink流计算引擎来以最小的成实现数据实时同步。本质来说Hudi是整合现有的技术方案实现的,属于新瓶装旧酒,Hudi内部需要整合各种组件(存储、Indexer、Compaction,文件分区),为了达到通用及灵活性,每个组件会有大量的配置参数需要设置,且各种组件 的配置是有关联性的,所以对与新手来说要构建一个生产环境中可用的数据库方案,面对一大堆配置往往会望而却步。本文就向大家介绍如何通过TIS来改善Hudi数据湖实例构建流程,从而大幅提高工作效率。

TIS可以为您做什么?

TIS将Hudi中的各组件进行优雅地封装,并且基于TIS的数据字典组件自动生成Hudi DeltaStreamer[2]Flink Stream API[3]运行所需要 配置,Hudi数据表相关的配置都是在TIS的UI界面上完成操作,实现了轻点鼠标完成构建流程,实现开箱即用从而大幅提高构建Hudi数据湖的效率

TIS采用两种方式实现数据入湖:

1. DeltaStreamer: 该方法实现批量数据导入,通过DataX将数据表中数据以avro格式导入到HDFS中,之后启动DeltaStreamer通过Spark RDD消费HDFS中的原始数据进行数据入湖。该种方式适合历史数据导入,优点是速度快吞吐率大。不足是无法提供Hudi表增量同步功能。

2. 基于Flink Stream API[4]方式(Stream SQL不推荐,原因是,使用Stream API的方式可以和DeltaStreamer执行流程中都依赖同一份Avro Schema来定义Hudi表结构,这样就保证Hudi表数据结构统一):基于Flink Stream API的方式来实现增量数据同步功能,优点是可以保证数据源和Hudi表保证低延时同步(一个CheckPoint周期之内),缺点是当利用该种方式结合Flink CDC组件来导入历史全量数据时由于触发CheckPoint执行过程种需要将历史数据写入到Flink Statebackend种存储,由于数据量大往往会导致Flink CheckPoint执行超时,导致Flink Job执行失败,另外,由于Flink Job执行过程种还会触发Hudi Compaction操作由于数据量大也会导致Flink Job产生OOM异常从而任务失败。

推荐用户在实际生产环境中将以上两种方式结合使用,初次构建Hudi,数据湖表需要导入历史全量数据,这时采用第一种DeltaStreamer批量导入历史全量数据。等待构建完成,继续开启Flink增量同步任务(以Flink CDC MySQL Connector[5]为例,消费游标使用Latest策略消费,消费最新Binlog增量数据) 这样就可以规避Flink Job由于读取全量历史数据因数据量大导致的Job失败的故障产生。

实现原理

实现原理如下图所示:

通过在TIS平台中定义Hudi数据湖实例,可以选择由DeltaStreamer来执行数据批量导入Hudi表,或者由Flink Job来执行增量Hudi表数据同步任务。

如何安装

TIS的安装非常方便,只需三个步骤:下载一个tar压缩包,解压,最后启动即可。详细请查看 TIS安装说明[6]](http://tis.pub/docs/install/uber/))

TIS是基于微内核架构来实现的,初始安装包只有200兆,其他具体执行逻辑相关的执行逻辑都封装在了TIS的插件系统中,

本文相关的Hudi和MySQL以及Flink增量同步、DataX批量同步相关的功能插件都是在TIS运行时按需加载并热部署生效的。

依赖版本

本示例依赖如下组件版本:

组件名称 版本
Apache Hudi 0.10.1
Apache Spark spark-2.4.4-bin-hadoop2.7
Apache Hive 2.1.1 以上
Apache Hadoop 2.7.3
Apache Flink tis-1.13.1(基于Flink 1.13.1 定制,解决不同组件Source,Sink之间可能存在的三方依赖包冲突)

创建MySQL到Hudi千表入湖通道

准备工作

1. 准备一个MySQL测试数据库,版本5.7

2. 下载Hudi Source包[7]运行环境(使用Hudi0.10.1):为了简化演示,在Hudi安装包中一个一个Hudi docker compose[8]启动脚本,本说明中就用该执行脚本来创建Hudi运行环境,详细请查看https://hudi.apache.org/docs/next/docker_demo完成Hudi Docker运行环境安装,启动docker-compose之前需要修改docker-compose的配置文件(./hudi-release-0.10.1/docker/compose/docker-compose_hadoop284_hive233_spark244.yml),在docker镜像实例sparkmasterspark-worker-1上添加一个hosts配置文件可以避免DeltatStreamer执行过程中提交任务端Hostname不能识别的错误:

添加项:

extra_hosts:
  - "baisui-test-1:192.168.28.200"

添加后效果:

sparkmaster:
           image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkmaster_2.4.4:latest
           hostname: sparkmaster
           container_name: sparkmaster
           env_file:
             - ./hadoop.env
           ports:
             - "8080:8080"
             - "7077:7077"
           extra_hosts:
             - "baisui-test-1:192.168.28.200"
           environment:
             - INIT_DAEMON_STEP=setup_spark
           links:
             - "hivemetastore"
             - "hiveserver"
             - "hive-metastore-postgresql"
             - "namenode"
       
         spark-worker-1:
           image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkworker_2.4.4:latest
           hostname: spark-worker-1
           container_name: spark-worker-1
           env_file:
             - ./hadoop.env
           depends_on:
             - sparkmaster
           ports:
             - "8081:8081"
           extra_hosts:
             - "baisui-test-1:192.168.28.200"
           environment:
             - "SPARK_MASTER=spark://sparkmaster:7077"
           links:
             - "hivemetastore"
             - "hiveserver"
             - "hive-metastore-postgresql"
             - "namenode"

然后就可以启动Hudi docker-compose了

# 启动 
sh hudi-0.10.1/docker/setup_demo.sh
# 停止
sh hudi-0.10.1/docker/stop_demo.sh

1. 安装tis-flink[9]](http://tis.pub/docs/install/flink-cluster/standalone/)) TIS整合了各种Source和Sink的插件,他们都是以插件(tpi)的方式封装的,为了实现开箱即用、避免插件之间三方包冲突,插件与插件之间是需要进行ClassLoader方式隔离,为此TIS对Flink13.1进行扩展(运行时TIS必须搭配TIS定制Flink,否则无法正常使用)

2. 安装单机版tis[10]

基本信息配置

  1. 1. 当完成安装步骤之后,进入TIS操作界面,点击菜单栏中实例链接

  2. 2. 进入实例列表,点击右侧添加下拉按钮中的数据管道,进行MySQL端到Hudi端的数据同步通道构建

  3. 3. 添加流程一共分为5步,第1步添加数据通道的基本信息

  4. 4. 进入数据端选择步骤,选择Reader Writer类型选择,由于系统刚安装,数据端类型对应的插件还没有选取,需要点击插件安装添加按钮,安装插件

  5. 5. 从插件列表中选择tis-ds-mysql-plugin,tis-datax-hudi-plugin两个插件进行安装

  6. 6. 插件安装完毕,将插件管理页面关闭

  7. 7. Reader端选择MySQL,Writer端选择Hudi,点击下一步按钮,进行MySQL Reader的设置

    8. 在Reader设置页面,点击数据库名项右侧配置下拉框中MySqlV5 数据源,完成表单填写,点击保存按钮,其他输入项目使用默认值即可,然后再点击下一步选取Reader端中需要处理的表


  8. 9. 选择需要的表: 点击设置按钮,对目标Hudi表设置,设置目标表的目标列分区主键等属性设置.


  9. 点击保存按钮,然后点击下一步,进入Hudi Writer表单设置



Hudi Writer表单

1. 点击sparkConn项右侧管理下拉框中添加按钮,添加SparkMaster源

2. 点击hiveConn项右侧 数据源管理下拉框添加按钮,添加hiveConn源

3. 点击fsName项右侧 FS管理 下拉框添加按钮,添加分布式文件系统源

4. 其他选项按照说明设置录入
确认页面,对上几步流程中录入的配置信息进行确认

5. 点击创建按钮完成数据流通道定义

批量数据导入

1. Hudi数据通道定义完成,开始批量导入MySQL中全量历史数据

2. 由于TIS系统初次安装,还未安装触发器插件,请按照只是先安装,成功之后再触发批量导入



Flink增量同步开通

接下来开通实时增量通道

1. 首先需要安装Flink单机版 安装说明[11]](http://tis.pub/docs/install/flink-cluster/standalone/))

2. Flink集群启动之后,在TIS中添加Flink集群对应配置,请妥善设置Flink相关的重启策略checkpointstateBackend相关属性

3. 表单填写完成之后,点击保存&下一步按钮进入下一步Sink,Source相关属性设置
该步骤添加Flink SourceFunction对应的flink-connector-mysql-cdc[12]](https://github.com/ververica/flink-cdc-connectors/tree/master/flink-connector-mysql-cdc))插件 和 Fink Sink对应的Hudi 插件

4. 设置完成之后进入下一步

5. TIS会解析Reader选取的表元数据信息,自动生成Flink Stream Code
在该版本中,自动生成的Flink Stream Code还不支持用户自定义编写业务逻辑

6. 点击部署按钮,进入向Flink Cluster中部署流处理逻辑
等待片刻,跳转到创建成功的页面

7. 至此,MySQL与Hudi表增量通道已经添加完成,MySQL到Hudi表实时数据同步可以保证在一个Checkpoint周期内完成,接下来可以尝试在MySQL数据表上,更新几条数据,然后在Hudi 对应的表上验证更新是否成功同步。

总结

通过以上流程介绍,我们发现通过使用TIS[13]](http://tis.pub))来实现MySQL与Hudi表同步有如下优势:

• 安装方便,组件按需加载,热生效

• 支持数据源分库,多表同步

• 完美实现低代码配置DataOps的目标,帮助用户大大提高工作效率,且避免出错

还等什么呢?赶快试用一下吧

后续

本次是TIS与数据湖产品Hudi的整合的初次尝试,Hudi的配置项比较繁杂,且各个配置项之间又存在各种依赖关系。TIS对Hudi的封装过程中还没有将Hudi的所有配置项(例如:Indexer的配置相关)开放给用户配置,后续会陆续完善。假如您TIS有功能需求,请在Github的Issue栏中添加:https://github.com/qlangtech/tis/issues[14]

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
108 2
|
23天前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
13天前
|
存储 分布式计算 分布式数据库
深入理解Apache HBase:构建大数据时代的基石
在大数据时代,数据的存储和管理成为了企业面临的一大挑战。随着数据量的急剧增长和数据结构的多样化,传统的关系型数据库(如RDBMS)逐渐显现出局限性。
71 12
|
2月前
|
Java 持续交付 项目管理
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。它采用项目对象模型(POM)来描述项目,简化构建流程。Maven提供依赖管理、标准构建生命周期、插件扩展等功能,支持多模块项目及版本控制。在Java Web开发中,Maven能够自动生成项目结构、管理依赖、自动化构建流程并运行多种插件任务,如代码质量检查和单元测试。遵循Maven的最佳实践,结合持续集成工具,可以显著提升开发效率和项目质量。
41 1
|
2月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
44 0
|
2月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
47 0
|
3月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
4月前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【6月更文挑战第30天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送序列化消息到主题,消费者通过订阅和跟踪偏移量消费消息。Kafka以持久化、容灾和顺序写入优化I/O。Java示例代码展示了如何创建并发送/接收消息。通过分区、消费者组和压缩等策略,Kafka在高并发场景下可被优化。
107 1
|
4月前
|
SQL 存储 运维
网易游戏如何基于阿里云瑶池数据库 SelectDB 内核 Apache Doris 构建全新湖仓一体架构
随着网易游戏品类及产品的快速发展,游戏数据分析场景面临着越来越多的挑战,为了保证系统性能和 SLA,要求引入新的组件来解决特定业务场景问题。为此,网易游戏引入 Apache Doris 构建了全新的湖仓一体架构。经过不断地扩张,目前已发展至十余集群、为内部上百个项目提供了稳定可靠的数据服务、日均查询量数百万次,整体查询性能得到 10-20 倍提升。
网易游戏如何基于阿里云瑶池数据库 SelectDB 内核 Apache Doris 构建全新湖仓一体架构
|
4月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

推荐镜像

更多
下一篇
无影云桌面