Apache Hudi + AWS S3 + Athena实战

简介: Apache Hudi在阿里巴巴集团、EMIS Health,LinkNovate,Tathastu.AI,腾讯,Uber内使用,并且由Amazon AWS EMR和Google云平台支持,最近Amazon Athena支持了在Amazon S3上查询Apache Hudi数据集的能力,本博客将测试Athena查询S3上Hudi格式数据集。

1. 准备-Spark环境,S3 Bucket


需要使用Spark写入Hudi数据,登陆Amazon EMR并启动spark-shell:

$ export SCALA_VERSION=2.12
$ export SPARK_VERSION=2.4.4
$ spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_${SCALA_VERSION}:0.5.3,org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

接着使用如下scala代码设置表名,基础路径以及数据生成器来生成数据。这里设置basepaths3://hudi_athena_test/hudi_trips,以便后面进行查询

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips"
val basePath = "s3://hudi_athena_test/hudi_trips"
val dataGen = new DataGenerator


2. 插入数据


生成新的行程数据,导入DataFrame,并将其写入Hudi表

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)


3. 创建Athena数据库/表


Hudi内置表分区支持,所以在创建表后需要添加分区,安装athenareader工具,其提供Athena多个查询和其他有用的特性。

go get -u github.com/uber/athenadriver/athenareader

接着创建hudi_athena_test.sql文件,内容如下

DROP DATABASE IF EXISTS hudi_athena_test CASCADE;
create database hudi_athena_test;
CREATE EXTERNAL TABLE `trips`(
  `begin_lat` double,
  `begin_lon` double,
  `driver` string,
  `end_lat` double,
  `end_lon` double,
  `fare` double,
  `rider` string,
  `ts` double,
  `uuid` string
) PARTITIONED BY (`partitionpath` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION 's3://hudi_athena_test/hudi_trips'
ALTER TABLE trips ADD
  PARTITION (partitionpath = 'americas/united_states/san_francisco') LOCATION 's3://hudi_athena_test/hudi_trips/americas/united_states/san_francisco'
  PARTITION (partitionpath = 'americas/brazil/sao_paulo') LOCATION 's3://hudi_athena_test/hudi_trips/americas/brazil/sao_paulo'
  PARTITION (partitionpath = 'asia/india/chennai') LOCATION 's3://hudi_athena_test/hudi_trips/asia/india/chennai'

使用如下命令运行SQL语句

$ athenareader -q hudi_athena_test.sql


4. 使用Athena查询Hudi


如果没有错误,那么说明库和表在Athena中都已创建好,因此可以在Athena中查询Hudi数据集,使用athenareader查询结果如下

athenareader -q "select * from trips" -o markdown

60.png

也可以带条件进行查询

athenareader -q "select fare,rider from trips where fare>20" -o markdown

61.png


5. 更新Hudi表再次查询


Hudi支持S3中的数据,回到spark-shell并使用如下命令更新部分数据

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

运行完成后,使用athenareader再次查询

athenareader -q "select * from trips" -o markdown

可以看到数据已经更新了

62.png


6. 限制


Athena不支持查询快照或增量查询,Hive/SparkSQL支持,为进行验证,通过spark-shell创建一个快照

spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*").
  createOrReplaceTempView("hudi_trips_snapshot")

使用如下代码查询

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)

使用Athena查询将会失败,因为没有物化

$ athenareader -q "select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime"
SYNTAX_ERROR: line 1:57: Table awsdatacatalog.hudi_athena_test.hudi_trips_snapshot does not exist

根据官方文档,Athena支持查询Hudi数据集的Read-Optimized视图,同时,我们可以通过Athena来创建视图并进行查询,使用Athena在Hudi表上创建一个视图

$ athenareader -q "create view fare_greater_than_40 as select * from trips where fare>40" -a

查询视图

$ athenareader -q "select fare,rider from fare_greater_than_40"
 FARE                RIDER     
 43.4923811219014    rider-213 
 63.72504913279929   rider-284 
 90.25710109008239   rider-284 
 93.56018115236618   rider-213 
 49.527694252432056  rider-284 
 90.9053809533154    rider-284 
 98.3428192817987    rider-284
目录
相关文章
|
8月前
|
运维 Linux Apache
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
118 2
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
105 5
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
60 3
|
5月前
|
关系型数据库 Linux 网络安全
"Linux系统实战:从零开始部署Apache+PHP Web项目,轻松搭建您的在线应用"
【8月更文挑战第9天】Linux作为服务器操作系统,凭借其稳定性和安全性成为部署Web项目的优选平台。本文以Apache Web服务器和PHP项目为例,介绍部署流程。首先,通过包管理器安装Apache与PHP;接着创建项目目录,并上传项目文件至该目录;根据需要配置Apache虚拟主机;最后重启Apache服务并测试项目。确保防火墙允许HTTP流量,正确配置数据库连接,并定期更新系统以维持安全。随着项目复杂度提升,进一步学习高级配置将变得必要。
417 0
|
6月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
7月前
|
存储 Apache 文件存储
在Apache环境下为Web网站增设访问控制:实战指南
在Apache服务器上保护网站资源涉及启用访问控制模块(`mod_authz_core`和`mod_auth_basic`),在`.htaccess`或`httpd.conf`中设定权限,如限制对特定目录的访问。创建`.htpasswd`文件存储用户名和密码,并使用`htpasswd`工具管理用户。完成配置后重启Apache服务,访问受限目录时需提供有效的用户名和密码。对于高安全性需求,可考虑更复杂的认证方法。【6月更文挑战第20天】
449 4
|
7月前
|
弹性计算 应用服务中间件 Linux
双剑合璧:在同一ECS服务器上共存Apache与Nginx的实战攻略
在ECS服务器上同时部署Apache和Nginx的实战:安装更新系统,Ubuntu用`sudo apt install apache2 nginx`,CentOS用`sudo yum install httpd nginx`。配置Nginx作为反向代理,处理静态内容及转发动态请求到Apache(监听8080端口)。调整Apache的`ports.conf`监听8080。重启服务测试,实现两者高效协同,提升Web服务性能。记得根据流量和需求优化配置。【6月更文挑战第21天】
631 1
|
6月前
|
分布式计算 Apache Spark
|
7月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
135 0
|
8月前
|
运维 Linux Apache
LAMP架构调优(九)——Apache Rewrite功能实战
LAMP架构调优(九)——Apache Rewrite功能实战
69 1

推荐镜像

更多