spark-shell操作hudi并使用hbase作为索引

简介: 本文介绍spark-shell操作hudi并使用hbase作为索引

前言

接上一篇文章,上篇文章说到hudi适配hbase 2.2.6,这篇文章在spark-shell中操作hudi,并使用hbase作为索引。要完成以下实验,请先确保你已经按照文章步骤对hudi进行适配。并且得到了hudi-spark3-bundle_2.12-0.9.0.jar

当然,如果你想先做一个实验,那么可以从这里以下链接下载我已经编译好的jar包。

组件版本以及前提要求:

组件版本:

hudi 0.9.0

hbase 2.2.6

spark 3.0.1

hadoop 3.2.0

hive 3.1.2

zookeeper:3.5.9

前提要求:

要完成以下实验,当然首先你需要有一个可以用的hadoop 3.2.0集群、hbase 2.2.6集群、主机环境中已经下载spark 3.0.1二进制包。

环境说明:

本实验环境使用的相关配置如下:

  • hdfs:hdfs://host117:8020
  • zookeeper:host117:2181
  • hbase对应zk_node_path:/hbase-secure
  • 在hbase上建一个名为hudi_hbase_index_test、列族为_s的表用于存放索引信息。命令为
create 'hudi_hbase_index_test', '_s'

拷贝hbase相关包到spark的jars目录下

我们在spark中使用hbase作为hudi的索引时,需要hbase相关jar包,所以我们需要将hbase目录下的以下jar包拷贝到spark的jars目录下:

  • hbase-protocol-shaded-2.2.6.jar
  • hbase-shaded-netty-2.2.1.jar
  • hbase-shaded-miscellaneous-2.2.1.jar

拷贝hudi-spark3-bundle_2.12-0.9.0.jar到spark的jars目录下

cp hudi-spark3-bundle_2.12-0.9.0.jar spark/jars

启动spark-shell执行hudi相关操作

启动spark-shell

./bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

使用DataGenerator类生成随机数据并写入hudi

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieHBaseIndexConfig._
import org.apache.hudi.config.HoodieIndexConfig._

val tableName = "spark_hudi_hbase_index_test"
val basePath =  "hdfs://host117:8020/tmp/spark_hudi_hbase_index_test"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(QPS_FRACTION.key(), 0.5).
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "false").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "false").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

注意事项:在使用hbase作为索引时,官网上关于hbase index 的配置说,某些配置项是可选的,但是实际在操作过程中发现其实那些配置项是必选的,比如QPS_ALLOCATOR_CLASS_NAME.key(),所以如果你在实际操作过程中,如果发现存在空指针错误的报错,那么可以按照报错信息查看是不是某些配置没有配导致的。

查看hbase上hudi表的索引信息

在完成上述数据写入之后,我们查看hbase中关于该表的索引信息:

查看hudi表中的数据

执行如下命令

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

查询结果

更新hudi表中数据

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(QPS_FRACTION.key(), 0.5).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "true").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "true").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

增量查询hudi表中数据

spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*").
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in

// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

相关结果如下所示:

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
分布式计算 资源调度 Hadoop
HBase表数据的读、写操作与综合操作
HBase表数据的读、写操作与综合操作
62 0
|
3月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
124 0
|
3月前
|
分布式计算 Hadoop Shell
熟悉常用的HBase操作
熟悉常用的HBase操作
66 3
熟悉常用的HBase操作
|
1月前
|
DataWorks 数据管理 大数据
DataWorks操作报错合集之在连接HBase时出现超时问题,该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
Java 大数据 API
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
87 0
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
|
3月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
62 0
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
54 0
|
3月前
|
消息中间件 分布式计算 Serverless
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
92 2