spark-shell操作hudi并使用hbase作为索引

简介: 本文介绍spark-shell操作hudi并使用hbase作为索引

前言

接上一篇文章,上篇文章说到hudi适配hbase 2.2.6,这篇文章在spark-shell中操作hudi,并使用hbase作为索引。要完成以下实验,请先确保你已经按照文章步骤对hudi进行适配。并且得到了hudi-spark3-bundle_2.12-0.9.0.jar

当然,如果你想先做一个实验,那么可以从这里以下链接下载我已经编译好的jar包。

组件版本以及前提要求:

组件版本:

hudi 0.9.0

hbase 2.2.6

spark 3.0.1

hadoop 3.2.0

hive 3.1.2

zookeeper:3.5.9

前提要求:

要完成以下实验,当然首先你需要有一个可以用的hadoop 3.2.0集群、hbase 2.2.6集群、主机环境中已经下载spark 3.0.1二进制包。

环境说明:

本实验环境使用的相关配置如下:

  • hdfs:hdfs://host117:8020
  • zookeeper:host117:2181
  • hbase对应zk_node_path:/hbase-secure
  • 在hbase上建一个名为hudi_hbase_index_test、列族为_s的表用于存放索引信息。命令为
create 'hudi_hbase_index_test', '_s'
AI 代码解读

拷贝hbase相关包到spark的jars目录下

我们在spark中使用hbase作为hudi的索引时,需要hbase相关jar包,所以我们需要将hbase目录下的以下jar包拷贝到spark的jars目录下:

  • hbase-protocol-shaded-2.2.6.jar
  • hbase-shaded-netty-2.2.1.jar
  • hbase-shaded-miscellaneous-2.2.1.jar

拷贝hudi-spark3-bundle_2.12-0.9.0.jar到spark的jars目录下

cp hudi-spark3-bundle_2.12-0.9.0.jar spark/jars

启动spark-shell执行hudi相关操作

启动spark-shell

./bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
AI 代码解读

使用DataGenerator类生成随机数据并写入hudi

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieHBaseIndexConfig._
import org.apache.hudi.config.HoodieIndexConfig._

val tableName = "spark_hudi_hbase_index_test"
val basePath =  "hdfs://host117:8020/tmp/spark_hudi_hbase_index_test"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(QPS_FRACTION.key(), 0.5).
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "false").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "false").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)
AI 代码解读

注意事项:在使用hbase作为索引时,官网上关于hbase index 的配置说,某些配置项是可选的,但是实际在操作过程中发现其实那些配置项是必选的,比如QPS_ALLOCATOR_CLASS_NAME.key(),所以如果你在实际操作过程中,如果发现存在空指针错误的报错,那么可以按照报错信息查看是不是某些配置没有配导致的。

查看hbase上hudi表的索引信息

在完成上述数据写入之后,我们查看hbase中关于该表的索引信息:

查看hudi表中的数据

执行如下命令

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
AI 代码解读

查询结果

更新hudi表中数据

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(QPS_FRACTION.key(), 0.5).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "true").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "true").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)
AI 代码解读

增量查询hudi表中数据

spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*").
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in

// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
AI 代码解读

相关结果如下所示:

相关实践学习
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
16
分享
相关文章
【shell】shell数组的操作(定义、索引、长度、获取、删除、修改、拼接)
【shell】shell数组的操作(定义、索引、长度、获取、删除、修改、拼接)
MySQL 备份 Shell 脚本:支持远程同步与阿里云 OSS 备份
一款自动化 MySQL 备份 Shell 脚本,支持本地存储、远程服务器同步(SSH+rsync)、阿里云 OSS 备份,并自动清理过期备份。适用于数据库管理员和开发者,帮助确保数据安全。
|
2月前
|
确定Shell脚本在操作系统中的具体位置方法。
这对于掌握Linux的文件系统组织结构和路径方面的理解很有帮助,是我们日常工作和学习中都可能使用到的知识。以上讲解详细清晰,应用简便,是每一个想要精通操作系统的计算机爱好者必备的实用技能。
60 17
Centos或Linux编写一键式Shell脚本创建用户、组、目录分配权限指导手册
Centos或Linux编写一键式Shell脚本创建用户、组、目录分配权限指导手册
145 3
|
2月前
|
Centos或Linux编写一键式Shell脚本删除用户、组指导手册
Centos或Linux编写一键式Shell脚本删除用户、组指导手册
58 4
|
3月前
|
在Linux、CentOS7中设置shell脚本开机自启动服务
以上就是在CentOS 7中设置shell脚本开机自启动服务的全部步骤。希望这个指南能帮助你更好地管理你的Linux系统。
161 25
|
9月前
|
一个用于添加/删除定时任务的shell脚本
一个用于添加/删除定时任务的shell脚本
236 1
|
5月前
|
【linux】Shell脚本中basename和dirname的详细用法教程
本文详细介绍了Linux Shell脚本中 `basename`和 `dirname`命令的用法,包括去除路径信息、去除后缀、批量处理文件名和路径等。同时,通过文件备份和日志文件分离的实践应用,展示了这两个命令在实际脚本中的应用场景。希望本文能帮助您更好地理解和应用 `basename`和 `dirname`命令,提高Shell脚本编写的效率和灵活性。
356 32
定期备份数据库:基于 Shell 脚本的自动化方案
本篇文章分享一个简单的 Shell 脚本,用于定期备份 MySQL 数据库,并自动将备份传输到远程服务器,帮助防止数据丢失。
多种脚本批量下载 Docker 镜像:Shell、PowerShell、Node.js 和 C#
本项目提供多种脚本(Shell、PowerShell、Node.js 和 C#)用于批量下载 Docker 镜像。配置文件 `docker-images.txt` 列出需要下载的镜像及其标签。各脚本首先检查 Docker 是否安装,接着读取配置文件并逐行处理,跳过空行和注释行,提取镜像名称和标签,调用 `docker pull` 命令下载镜像,并输出下载结果。使用时需创建配置文件并运行相应脚本。C# 版本需安装 .NET 8 runtime。
252 2
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问