一文彻底掌握Apache Hudi异步Clustering部署

简介: 在之前的一篇中,我们介绍了Clustering(聚簇)的表服务来重新组织数据来提供更好的查询性能,而不用降低摄取速度,并且我们已经知道如何部署同步Clustering,本篇博客中,我们将讨论近期社区做的一些改进以及如何通过HoodieClusteringJob和DeltaStreamer工具来部署异步Clustering。

1. 摘要


在之前的一篇中,我们介绍了Clustering(聚簇)的表服务来重新组织数据来提供更好的查询性能,而不用降低摄取速度,并且我们已经知道如何部署同步Clustering,本篇博客中,我们将讨论近期社区做的一些改进以及如何通过HoodieClusteringJobDeltaStreamer工具来部署异步Clustering


2. 介绍


通常讲,Clustering根据可配置的策略创建一个计划,根据特定规则对符合条件的文件进行分组,然后执行该计划。Hudi支持并发写入,并在多个表服务之间提供快照隔离,从而允许写入程序在后台运行Clustering时继续摄取。有关Clustering的体系结构的更详细概述请查看上一篇博文。


3. Clustering策略


如前所述Clustering计划和执行取决于可插拔的配置策略。这些策略大致可分为三类:计划策略执行策略更新策略


3.1 计划策略

该策略在创建Clustering计划时发挥作用。它有助于决定应该对哪些文件组进行Clustering。让我们看一下Hudi提供的不同计划策略。请注意,使用此配置可以轻松地插拔这些策略。

  • SparkSizeBasedClusteringPlanStrategy:根据基本文件的小文件限制选择文件切片并创建Clustering组,最大大小为每个组允许的最大文件大小。可以使用此配置指定最大大小。此策略对于将中等大小的文件合并成大文件非常有用,以减少跨冷分区分布的大量文件。
  • SparkRecentDaysClusteringPlanStrategy:根据以前的N天分区创建一个计划,将这些分区中的小文件片进行Clustering,这是默认策略,当工作负载是可预测的并且数据是按时间划分时,它可能很有用。
  • SparkSelectedPartitionsClusteringPlanStrategy:如果只想对某个范围内的特定分区进行Clustering,那么无论这些分区是新分区还是旧分区,此策略都很有用,要使用此策略,还需要在下面设置两个配置(包括开始和结束分区):
hoodie.clustering.plan.strategy.cluster.begin.partition
hoodie.clustering.plan.strategy.cluster.end.partition

注意:所有策略都是分区感知的,后两种策略仍然受到第一种策略的大小限制的约束。


3.2 执行策略

在计划阶段构建Clustering组后,Hudi主要根据排序列和大小为每个组应用执行策略,可以使用此配置指定策略。

SparkSortAndSizeExecutionStrategy是默认策略。使用此配置进行Clustering时,用户可以指定数据排序列。除此之外我们还可以为Clustering产生的Parquet文件设置最大文件大小。该策略使用bulk_insert将数据写入新文件,在这种情况下,Hudi隐式使用一个分区器,该分区器根据指定列进行排序。通过这种策略改变数据布局,不仅提高了查询性能,而且自动平衡了重写开销。

现在该策略可以作为单个Spark作业或多个作业执行,具体取决于在计划阶段创建的Clustering组的数量。默认情况下Hudi将提交多个Spark作业并合并结果。如果要强制Hudi使用单Spark作业,请将执行策略类配置设置为SingleSparkJobExecutionStrategy


3.3 更新策略

目前只能为未接收任何并发更新的表/分区调度Clustering。默认情况下更新策略的配置设置为SparkRejectUpdateStrategy。如果某个文件组在Clustering期间有更新,则它将拒绝更新并引发异常。然而在某些用例中,更新是非常稀疏的,并且不涉及大多数文件组。简单拒绝更新的默认策略似乎不公平。在这种用例中用户可以将配置设置为SparkAllowUpdateStregy

我们讨论了关键策略配置,下面列出了与Clustering相关的所有其他配置。在此列表中一些非常有用的配置包括:

配置项 解释 默认值
hoodie.clustering.async.enabled 启用在表上的异步运行Clustering服务。 false
hoodie.clustering.async.max.commits 通过指定应触发多少次提交来控制异步Clustering的频率。 4
hoodie.clustering.preserve.commit.metadata 重写数据时保留现有的_hoodie_commit_time。这意味着用户可以在Clustering数据上运行增量查询,而不会产生任何副作用。 false


4. 异步Clustering


之前我们已经了解了用户如何设置同步Clustering此外用户可以利用HoodiecClusteringJob设置两步异步Clustering


4.1 HoodieClusteringJob

随着Hudi版本0.9.0的发布,我们可以在同一步骤中调度和执行Clustering。我们只需要指定-mode-m选项。有如下三种模式:

  • schedule(调度):制定一个Clustering计划。这提供了一个可以在执行模式下传递的instant
  • execute(执行):在给定的instant执行Clustering计划,这意味着这里需要instant
  • scheduleAndExecute(调度并执行):首先制定Clustering计划并立即执行该计划。

请注意要在原始写入程序仍在运行时运行作业请启用多写入:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

使用spark submit命令提交HoodieClusteringJob示例如下:

spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /path/to/hudi_table/basePath \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g

clusteringjob.properties配置文件示例如下

hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=column1,column2


4.2 HoodieDeltaStreamer

接着看下如何使用HudiDeltaStreamer。现在我们可以使用DeltaStreamer触发异步Clustering。只需将hoodie.clustering.async.enabledtrue,并在属性文件中指定其他Clustering配置,在启动Deltastreamer时可以将其位置设为-props(与HoodieClusteringJob配置类似)。

使用spark submit命令提交HoodieDeltaStreamer示例如下:

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clustering_kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--table-type COPY_ON_WRITE \
--target-base-path /path/to/hudi_table/basePath \
--target-table impressions_cow_cluster \
--op INSERT \
--hoodie-conf hoodie.clustering.async.enabled=true \
--continuous


4.3 Spark Structured Streaming

我们还可以使用Spark结构化流启用异步Clustering,如下所示。

val commonOpts = Map(
   "hoodie.insert.shuffle.parallelism" -> "4",
   "hoodie.upsert.shuffle.parallelism" -> "4",
   DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
   DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
   DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
   HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
def getAsyncClusteringOpts(isAsyncClustering: String, 
                           clusteringNumCommit: String, 
                           executionStrategy: String):Map[String, String] = {
   commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
           HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
           HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy
   )
}
def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {
   val streamingInput = // define the source of streaming
   Future {
      println("streaming starting")
      streamingInput
              .writeStream
              .format("org.apache.hudi")
              .options(hudiOptions)
              .option("checkpointLocation", basePath + "/checkpoint")
              .mode(Append)
              .start()
              .awaitTermination(10000)
      println("streaming ends")
   }
}
def structuredStreamingWithClustering(): Unit = {
   val df = //generate data frame
   val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
   val f1 = initStreamingWriteFuture(hudiOptions)
   Await.result(f1, Duration.Inf)
}


5. 总结和未来工作

在这篇文章中,我们讨论了不同的Clustering策略以及如何设置异步Clustering。未来的工作包括:

  • Clustering支持更新。
  • 支持Clustering的CLI工具。


目录
相关文章
|
2月前
|
SQL 存储 数据处理
兼顾高性能与低成本,浅析 Apache Doris 异步物化视图原理及典型场景
Apache Doris 物化视图进行了支持。**早期版本中,Doris 支持同步物化视图;从 2.1 版本开始,正式引入异步物化视图,[并在 3.0 版本中完善了这一功能](https://www.selectdb.com/blog/1058)。**
|
3月前
|
数据处理 Apache 数据库
将 Python UDF 部署到 Apache IoTDB 的详细步骤与注意事项
【10月更文挑战第21天】将 Python UDF 部署到 Apache IoTDB 中需要一系列的步骤和注意事项。通过仔细的准备、正确的部署和测试,你可以成功地将自定义的 Python UDF 应用到 Apache IoTDB 中,为数据处理和分析提供更灵活和强大的支持。在实际操作过程中,要根据具体情况进行调整和优化,以确保实现最佳的效果。还可以结合具体的代码示例和实际部署经验,进一步深入了解和掌握这一过程。
35 2
|
5月前
|
关系型数据库 MySQL 应用服务中间件
win7系统搭建PHP+Mysql+Apache环境+部署ecshop项目
这篇文章介绍了如何在Windows 7系统上搭建PHP、MySQL和Apache环境,并部署ECShop项目,包括安装配置步骤、解决常见问题以及使用XAMPP集成环境的替代方案。
65 1
win7系统搭建PHP+Mysql+Apache环境+部署ecshop项目
|
4月前
|
Linux Apache 数据安全/隐私保护
kali向Apache上部署网页
kali向Apache上部署网页
134 5
|
5月前
|
存储 Ubuntu 应用服务中间件
如何在 Ubuntu 14.04 上使用 Passenger 和 Apache 部署 Rails 应用
如何在 Ubuntu 14.04 上使用 Passenger 和 Apache 部署 Rails 应用
32 0
|
5月前
|
关系型数据库 Linux 网络安全
"Linux系统实战:从零开始部署Apache+PHP Web项目,轻松搭建您的在线应用"
【8月更文挑战第9天】Linux作为服务器操作系统,凭借其稳定性和安全性成为部署Web项目的优选平台。本文以Apache Web服务器和PHP项目为例,介绍部署流程。首先,通过包管理器安装Apache与PHP;接着创建项目目录,并上传项目文件至该目录;根据需要配置Apache虚拟主机;最后重启Apache服务并测试项目。确保防火墙允许HTTP流量,正确配置数据库连接,并定期更新系统以维持安全。随着项目复杂度提升,进一步学习高级配置将变得必要。
417 0
|
6月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
7月前
|
Java 应用服务中间件 Apache
安装和配置Apache Tomcat是部署Java Web应用程序的常见任务
安装和配置Apache Tomcat是部署Java Web应用程序的常见任务
188 7
|
7月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
135 0
|
8月前
|
安全 Linux 网络安全
Linux _ apache服务器部署 不同域名—访问不同网站(多网站)
Linux _ apache服务器部署 不同域名—访问不同网站(多网站)
226 1

推荐镜像

更多