大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)

一、概述

Hudi(Hadoop Upserts Deletes and Incrementals),简称Hudi,是一个流式数据湖平台,支持对海量数据快速更新,内置表格式,支持事务的存储层、 一系列表服务、数据服务(开箱即用的摄取工具)以及完善的运维监控工具,它可以以极低的延迟将数据快速存储到HDFS或云存储(S3)的工具,最主要的特点支持记录级别的插入更新(Upsert)和删除,同时还支持增量查询。

GitHub地址:https://github.com/apache/hudi

官方文档:https://hudi.apache.org/cn/docs/overview

关于Apache Hudi 数据湖 也可以参考我这篇文章:大数据Hadoop之——新一代流式数据湖平台 Apache Hudi

二、Hudi CLI

构建hudi后,可以通过cd hudi cli&&./hudi-cli.sh启动shell。一个hudi表驻留在DFS上的一个称为basePath的位置,我们需要这个位置才能连接到hudi表。Hudi库有效地在内部管理此表,使用.hoodie子文件夹跟踪所有元数据。

编译生成的包如下:

复制

# 启动
./hudi-cli/hudi-cli.sh• 1.
• 2.

三、Spark 与 Hudi 整合使用

Hudi 流式数据湖平台,协助管理数据,借助HDFS文件系统存储数据,使用Spark操作数据。

Hadoop 安装可参考我这篇文章:大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce)Hadoop HA安装可参考我这篇文章:大数据Hadoop之——Hadoop 3.3.4 HA(高可用)原理与实现(QJM)Spark 环境配置可以参考我这篇文章:大数据Hadoop之——计算引擎Spark

1)Spark 测试

复制

cd $SPARK_HOME
hdfs dfs -mkdir /tmp/
hdfs dfs -put README.md /tmp/
hdfs dfs -text /tmp/README.md
# 启动spark-shell
./bin/spark-shell --master local[2]
val datasRDD = sc.textFile("/tmp/README.md")
# 行数
datasRDD.count()
# 读取第一行数据
datasRDD.first()
val dataframe = spark.read.textFile("/tmp/README.md")
dataframe.printSchema
dataframe.show(10,false)• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.
• 10.
• 11.
• 12.
• 13.
• 14.
• 15.
• 16.

2)Spark 与 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/quick-start-guide/在spark-shell命令行,对Hudi表数据进行操作,需要运行spark-shell命令是,添加相关的依赖包,命令如下:

1、启动spark-shell

【第一种方式】在线联网下载相关jar包

复制

### 启动spark-shell,使用spark-shell操作hudi数据湖
### 第一种方式
./bin/spark-shell \
  --master local[2] \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensinotallow=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
### 上述命令需要联网,基于ivy下载下载相关jar包到本地,然后加载到CLASSPATH,其中包含三个jar包。• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.
• 10.

【第二种方式】离线使用已经下载好的jar包

复制

### 第二种方式,使用--jars
cd /opt/apache
wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.0/spark-avro_2.12-3.3.0.jar
cd $SPARK_HOME
./bin/spark-shell \
--master local[2] \
--jars  /opt/apache/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar,/opt/apache/hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/unused-1.0.0.jar,/opt/apache/spark-avro_2.12-3.3.0.jar \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.
• 10.

2、导入park及Hudi相关包

复制

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.

3、定义变量

复制

val tableName = "hudi_trips_cow"
# 存储到HDFS
val basePath = "hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow"
# 存储到本地
# val basePath = "file:///tmp/hudi_trips_cow"• 1.
• 2.
• 3.
• 4.
• 5.

4、模拟生成Trip乘车数据

复制

##构建DataGenerator对象,用于模拟生成10条Trip乘车数据
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))• 1.
• 2.
• 3.
• 4.

其中,DataGenerator可以用于生成测试数据,用来完成后续操作。

5、将模拟数据List转换为DataFrame数据集

复制

##转成df
val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))
##查看数据结构
df.printSchema()
##查看数据
df.show()
# 指定字段查询
df.select("rider","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","ts","uuid").show(10,truncate=false)• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.

6、将数据写入到hudi

复制

# 将数据保存到hudi表中,由于Hudi诞生时基于Spark框架,所以SparkSQL支持Hudi数据源,直接通过format指定数据源Source,设置相关属性保存数据即可,注意,hudi不是正真存储数据,而是管理数据。
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)
## 重要参数说明
#参数:getQuickstartWriteConfigs,设置写入/更新数据至Hudi时,Shuffle时分区数目
#参数:PRECOMBINE_FIELD_OPT_KEY,数据合并时,依据主键字段
#参数:RECORDKEY_FIELD_OPT_KEY,每条记录的唯一id,支持多个字段
#参数:PARTITIONPATH_FIELD_OPT_KEY,用于存放数据的分区字段• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.
• 10.
• 11.
• 12.
• 13.
• 14.
• 15.
• 16.

本地存储

HDFS 存储

四、Flink 与 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/flink-quick-start-guide

1)启动flink集群

下载地址:http://flink.apache.org/downloads.html

复制

### 1、下载软件包
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
tar -xf flink-1.14.6-bin-scala_2.12.tgz
export FLINK_HOME=/opt/apache/flink-1.14.6
### 2、设置HADOOP_CLASSPATH
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export HADOOP_CONF_DIR='/opt/apache/hadoop/etc/hadoop'
### 3、启动单节点flink 集群
# Start the Flink standalone cluster,这里先修改slot数量,默认是1,这里改成4
# taskmanager.numberOfTaskSlots: 4
cd $FLINK_HOME
./bin/start-cluster.sh
# 测试可用性
./bin/flink run  examples/batch/WordCount.jar• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.
• 10.
• 11.
• 12.
• 13.
• 14.
• 15.
• 16.
• 17.
• 18.

2) 启动flink SQL 客户端

复制

# 【第一种方式】指定jar包
./bin/sql-client.sh embedded -j ../hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle-0.12.0.jar shell
# 【第二种方式】还可以将jar包放在$FINK_HOME/lib目录下
./bin/sql-client.sh embedded shell• 1.
• 2.
• 3.
• 4.
• 5.

3)添加数据

复制

-- sets up the result mode to tableau to show the results directly in the CLI
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);
INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.
• 10.
• 11.
• 12.
• 13.
• 14.
• 15.
• 16.
• 17.
• 18.
• 19.
• 20.
• 21.
• 22.
• 23.
• 24.
• 25.
• 26.
• 27.
• 28.

HDFS上查看

4)查询数据(批式查询)

复制

select * from t1;• 1.

5)更新数据

复制

-- this would update the record with key 'id1'
insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');• 1.
• 2.
• 3.

6)Streaming Query(流式查询)

首先创建表t2,设置相关属性,以流的方式查询读取,映射到上面表:t1

  • read.streaming.enabled 设置为true,表明通过streaming的方式读取表数据;
  • read.streaming.check-interval 指定了source监控新的commits的间隔时间4s
  • table.type 设置表类型为 MERGE_ON_READ

复制

CREATE TABLE t2(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.start-commit' = '20210316134557', -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
-- Then query the table in stream mode
select * from t2;• 1.
• 2.
• 3.
• 4.
• 5.
• 6.
• 7.
• 8.
• 9.
• 10.
• 11.
• 12.
• 13.
• 14.
• 15.
• 16.
• 17.
• 18.
• 19.

注意:查看可能会遇到如下错误:

[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

【解决】添加hadoop-mapreduce-client-core-xxx.jar和hive-exec-xxx.jar到Flink lib中。

复制

cp /opt/apache/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.2.jar $FLINK_HOME/lib
cp ./hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2.3.1-core.jar $FLINK_HOME/lib• 1.
• 2.

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
15天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
37 2
|
15天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
17天前
|
SQL 分布式计算 Hadoop
利用Hive与Hadoop构建大数据仓库:从零到一
【4月更文挑战第7天】本文介绍了如何使用Apache Hive与Hadoop构建大数据仓库。Hadoop的HDFS和YARN提供分布式存储和资源管理,而Hive作为基于Hadoop的数据仓库系统,通过HiveQL简化大数据查询。构建过程包括设置Hadoop集群、安装配置Hive、数据导入与管理、查询分析以及ETL与调度。大数据仓库的应用场景包括海量数据存储、离线分析、数据服务化和数据湖构建,为企业决策和创新提供支持。
58 1
|
1月前
|
消息中间件 SQL 分布式计算
大数据Hadoop生态圈体系视频课程
熟悉大数据概念,明确大数据职位都有哪些;熟悉Hadoop生态系统都有哪些组件;学习Hadoop生态环境架构,了解分布式集群优势;动手操作Hbase的例子,成功部署伪分布式集群;动手Hadoop安装和配置部署;动手实操Hive例子实现;动手实现GPS项目的操作;动手实现Kafka消息队列例子等
20 1
大数据Hadoop生态圈体系视频课程
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1416 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1355 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3
|
1月前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0
|
11月前
|
数据采集 分布式计算 Kubernetes
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(下)
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(下)
241 0

推荐镜像

更多