Apache Hudi与Hive集成手册

简介: Apache Hudi与Hive集成手册

1. Hudi表对应的Hive外部表介绍

Hudi源表对应一份HDFS数据,可以通过Spark,Flink 组件或者Hudi客户端将Hudi表的数据映射为Hive外部表,基于该外部表, Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。

2. Hive对Hudi的集成

这里以Hive3.1.1、 Hudi 0.9.0为例, 其他版本类似

将hudi-hadoop-mr-bundle-0.9.0xxx.jar , hudi-hive-sync-bundle-0.9.0xx.jar 放到hiveserver 节点的lib目录下修改hive-site.xml找到hive.default.aux.jars.path 以及hive.aux.jars.path 这两个配置项,将第一步中的jar包全路径给配置上去:配置后如下<name>hive.default.aux.jars.path</name>
<value>xxxx,jar,xxxx,jar,file:///mypath/hudi-hadoop-mr-bundle-0.9.0xxx.jar,file:///mypath/hudi-hive-sync-bundle-0.9.0xx.jar</value>
配置完后重启hive-server对于Hudi的bootstrap表(tez查询),除了要添加hudi-hadoop-mr-bundle-0.9.0xxx.jar , hudi-hive-sync-bundle-0.9.0xx.jar这两个jar包,还需把hbase-shaded-miscellaneous-xxx.jar, hbase-metric-api-xxx.jar,hbase-metrics-xxx.jar, hbase-protocol-shaded-xx.jar,hbase-shaded-protobuf-xxx.jar,htrce-core4-4.2.0xxxx.jar 按上述步骤添加进去。

3. 创建Hudi表对应的hive外部表

一般来说Hudi表在用Spark或者Flink写入数据时会自动同步到Hive外部表, 此时可以直接通过beeline查询同步的外部表, 若写入引擎没有开启自动同步,则需要手动利用hudi客户端工具run_hive_sync_tool.sh 进行同步具体可以参考官网查看相关参数。

4. 查询Hudi表对应的Hive外部表

4.1 操作前提

使用Hive查询Hudi表前,需要通过set命令设置hive.input.format,否则会出现数据重复,查询异常等错误,如下面这个报错就是典型的没有设置hive.input.format 导致的

java.lang.IllegalArgumentException: HoodieRealtimeReader can oly work on RealTimeSplit and not with xxxxxxxxxx

除此之外对于增量查询,还需要set命令额外设置3个参数

set hoodie.mytableName.consume.mode=INCREMENTAL;
set hoodie.mytableName.consume.max.commits=3;
set hoodie.mytableName.consume.start.timestamp=commitTime;

注意这3个参数是表级别参数

参数名 描述
hoodie.mytableName.consume.mode Hudi表的查询模式。增量查询 :INCREMENTAL非增量查询:不设置或者设为SNAPSHOT
hoodie.mytableName.consume.start.timestamp Hudi表增量查询起始时间
hoodie. mytableName.consume.max.commits Hudi表基于hoodie.mytableName.consume.start.timestamp 之后要查询的增量commit次数。提交次数,如设置为3时,代表增量查询从指定的起始时间之后commit 3次的数据,设为-1时,增量查询从指定的起始时间之后提交的所有数据

4.2 COW类型Hudi表的查询

例如Hudi原表表名为hudicow,同步给hive之后hive表名hudicow

4.2.1 COW表实时视图查询

设置hive.input.format 为org.apache.hadoop.hive.ql.io.HiveInputFormat或者org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat后,像普通的hive表一样查询即可

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
select count(*) from hudicow;

4.2.2 COW表增量查询

除了要设置hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中的必须添加where 关键字并将_hoodie_commit_time > 'startCommitTime'作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据)

set hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hoodie.hudicow.consume.mode = INCREMENTAL;
set hoodie.hudicow.consume.max.commits = 3;
set hoodie.hudicow.consume.start.timestamp = xxxx;
select count(*) from hudicow where `_hoodie_commit_time` > 'xxxx'

注意_hoodie_commit_time 的引号是反引号(tab键上面那个)不是单引号, 'xxxx'是单引号

4.3 MOR类型Hudi表的查询

例如mor类型Hudi源表的表名为hudimor,映射为两张Hive外部表hudimor_ro(ro表)和hudimor_rt(rt表)

4.3.1 MOR表读优化视图

实际上就是读 ro表,和cow表类似设置完hiveInputFormat 之后 和普通的hive表一样查询即可。

4.3.2 MOR表实时视图

设置了hive.input.format之后,即可查询到Hudi源表的最新数据

set hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat;
select * from hudicow_rt;

4.3.3 MOR表增量查询

这个增量查询针对的rt表,不是ro表。通COW表的增量查询类似

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; // 这地方指定为HoodieCombineHiveInputFormat
set hoodie.hudimor.consume.mode = INCREMENTAL;set hoodie.hudimor.consume.max.commits = -1;
set hoodie.hudimor.consume.start.timestamp = xxxx;
select * from hudimor_rt where `_hoodie_commit_time` > 'xxxx'; // 这个表名要是rt表

说明如下

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;最好只用于rt表的增量查询,当然其他种类的查询也可以设置为这个,这个参数会影响到普通的hive表查询,因此在rt表增量查询完成后,应该设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;或者改为默认值set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;用于其他表的查询。set hoodie.mytableName.consume.mode=INCREMENTAL;仅用于该表的增量查询模式,若要对该表切换为其他查询模式,应设置set hoodie.hudisourcetablename.consume.mode=SNAPSHOT;

当前Hudi(0.9.0)对接Hive的一些问题,请使用master分支或即将发布的0.10.0版本

hive读hudi表会将所有的数据给打印出来有严重的性能问题和数据安全问题。MOR表的实时视图读取 请按需设置mapreduce.input.fileinputformat.split.maxsize的大小 禁止hive取切分读取的文件,否则会出现数据重复。这个问题当前是无解的,spark读hudi实时视图的时候代码直接写死不会切分文件,hive需要手动设置。如果碰到classNotFound, noSuchMethod等错误请检查hive lib库下面的jar包是否出现冲突。

5. Hive侧源码修改

为支持Hive查询Hudi的纯log文件需要对Hive侧源码进行修改。

具体修改org.apache.hadoop.hive.common.FileUtils 如下函数

public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {    
  @Override    
  public boolean accept(Path p) {      
    String name = p.getName();      
    boolean isHudiMeta = name.startsWith(".hoodie");      
    boolean isHudiLog = false;      
    Pattern LOG_FILE_PATTERN = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?");      
    Matcher matcher = LOG_FILE_PATTERN.matcher(name);      
    if (matcher.find()) {        
      isHudiLog = true;      
    }      
    boolean isHudiFile = isHudiLog || isHudiMeta;      
    return (!name.startsWith("_") && !name.startsWith(".")) || isHudiFile;    
  }  
};

重新编译hive, 把新编译的hive-common-xxx.jar, hive-exec-xxx.jar 替换到hive server的lib目录下注意权限和名字和原来的jar包保持一致。

最后重启hive-server即可。


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
17天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
48 5
|
20天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
32 1
|
2月前
|
SQL 分布式计算 Hadoop
Apache Hive 帮助文档
Apache Hive 帮助文档
100 9
|
2月前
|
Java 测试技术 API
如何在 Apache JMeter 中集成 Elastic APM
如何在 Apache JMeter 中集成 Elastic APM
44 1
|
5月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
18386 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
4月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
88 0
|
5月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
5月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka集成的深度指南
Spring Boot与Apache Kafka集成的深度指南
|
6月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka集成的深度指南
Spring Boot与Apache Kafka集成的深度指南
|
2月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用

推荐镜像

更多