实战 | Apache Hudi回调功能简介及使用示例

简介: 实战 | Apache Hudi回调功能简介及使用示例

1. 功能简介

从0.6.0版本开始,Hudi开始支持 commit 回调功能,即每当Hudi成功提交一次 commit, 其内部的回调服务就会向外部系统发出一条回调信息,用户可以根据该回调信息查询Hudi表的增量数据,并根据具体需求进行相应的业务处理。

1.1 支持的回调方式

当前 HoodieDeltaStreamer 可通过 HTTP(默认) 和 Kafka 两种方式向外部发送回调信息,而 SparkDataSource 暂只支持 HTTP 一种。两种数据摄入方式在使用回调功能上没有区别(除了回调方式支持不同外),均通过参数配置实现。

配置相应参数后,启动的任务会在每次成功提交后像外部系统发送 Json 格式的回调信息,信息样例:

{"commitTime":"20201225152956","tableName":"callback_test","basePath":"file:///tmp/hudi_callback"}

1.2 参数配置

1.2.1 HTTP

HTTP 回调是默认方式,可通过配置下列参数启用.

必配参数:

## 是否开启回调功能,默认falsehoodie.write.commit.callback.on=true## 回调地址(必填)hoodie.write.commit.callback.http.url=http://ip:端口/callback

可选参数:

## 超时时间,默认三秒hoodie.write.commit.callback.http.timeout.seconds=xxx## api key 默认:hudi_write_commit_http_callbackhoodie.write.commit.callback.http.api.key=fake_api_key

Note: 回调接收服务需使用 callbackMsg 字段接收信息。

1.2.2 Kafka

Kafka回调目前只支持 HoodieDeltaStreamer,使用方式与 HTTP类似.

必配参数:

## 是否开启回调功能,默认falsehoodie.write.commit.callback.on=true## 回调方式,使用Kafka实现类hoodie.write.commit.callback.class=org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback## Kafka serverhoodie.write.commit.callback.kafka.bootstrap.servers=xxx:9092## 回调 kafka 主题hoodie.write.commit.callback.kafka.topic=xxx_topic

可选参数:

## 输出写出的分区,默认 0hoodie.write.commit.callback.kafka.partition=分区数## ack 级别,默认 allhoodie.write.commit.callback.kafka.acks=all## 失败重试次数,默认 3hoodie.write.commit.callback.kafka.retries=重试次数

2. HTTP方式使用示例

由于使用方式相同,这里方便起见我们使用HTTP 方式示例。

数据写入代码:

@Testdef insert(): Unit = {  val inserts = convertToStringList(dataGen.generateInserts(100))  val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))  df.write.format("hudi").    options(getQuickstartWriteConfigs).    option(PRECOMBINE_FIELD_OPT_KEY, "ts").    option(RECORDKEY_FIELD_OPT_KEY, "uuid").    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").    option(HoodieWriteCommitCallbackConfig.CALLBACK_ON, "true").    option(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP, "http://localhost:8080/callback").    option(TABLE_NAME, tableName).    mode(Overwrite).    save(basePath)}

Note:

tableName = "callback_test"basePath = "file:///tmp/hudi_callback"

接收回调信息的服务:

@RestControllerpublic class HoodieCallbackController {  @RequestMapping("/callback")  public void callback(@RequestBody String callbackMsg) {    System.out.println(callbackMsg);  }}

启动insert方法,控制台打印出回调信息:

.   ____          _            __ _ _ /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/  ___)| |_)| | | | | || (_| |  ) ) ) )  '  |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot ::                (v2.4.1)2020-12-25 15:29:37.732  INFO 3860 --- [           main] c.h.h.HudiCallbackApplication            : Starting HudiCallbackApplication using Java 1.8.0_211 on Mathieu with PID 3860 (/Users/wangxianghu/github/hudi-callback/target/classes started by wangxianghu in /Users/wangxianghu/github/hudi-callback)2020-12-25 15:29:37.735  INFO 3860 --- [           main] c.h.h.HudiCallbackApplication            : No active profile set, falling back to default profiles: default2020-12-25 15:29:38.386  INFO 3860 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)2020-12-25 15:29:38.394  INFO 3860 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]2020-12-25 15:29:38.394  INFO 3860 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]2020-12-25 15:29:38.443  INFO 3860 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext2020-12-25 15:29:38.443  INFO 3860 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 674 ms2020-12-25 15:29:38.577  INFO 3860 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'2020-12-25 15:29:38.737  INFO 3860 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''2020-12-25 15:29:38.747  INFO 3860 --- [           main] c.h.h.HudiCallbackApplication            : Started HudiCallbackApplication in 1.279 seconds (JVM running for 1.698)2020-12-25 15:29:59.893  INFO 3860 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'2020-12-25 15:29:59.894  INFO 3860 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'2020-12-25 15:29:59.894  INFO 3860 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms{"commitTime":"20201225152956","tableName":"callback_test","basePath":"file:///tmp/hudi_callback"}

3. 总结

本文简要介绍了Hudi支持的回调方式以及各种方式的详细配置,并以 HTTP回调方式为例做了简要示范。

目录
相关文章
|
18天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
40 3
|
3月前
|
消息中间件 传感器 数据处理
"揭秘实时流式计算:低延迟、高吞吐量的数据处理新纪元,Apache Flink示例带你领略实时数据处理的魅力"
【8月更文挑战第10天】实时流式计算即时处理数据流,低延迟捕获、处理并输出数据,适用于金融分析等需即时响应场景。其框架(如Apache Flink)含数据源、处理逻辑及输出目标三部分。例如,Flink可从数据流读取信息,转换后输出。此技术优势包括低延迟、高吞吐量、强容错性及处理逻辑的灵活性。
79 4
|
3月前
|
关系型数据库 Linux 网络安全
"Linux系统实战:从零开始部署Apache+PHP Web项目,轻松搭建您的在线应用"
【8月更文挑战第9天】Linux作为服务器操作系统,凭借其稳定性和安全性成为部署Web项目的优选平台。本文以Apache Web服务器和PHP项目为例,介绍部署流程。首先,通过包管理器安装Apache与PHP;接着创建项目目录,并上传项目文件至该目录;根据需要配置Apache虚拟主机;最后重启Apache服务并测试项目。确保防火墙允许HTTP流量,正确配置数据库连接,并定期更新系统以维持安全。随着项目复杂度提升,进一步学习高级配置将变得必要。
329 0
|
4月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
5月前
|
存储 Apache 文件存储
在Apache环境下为Web网站增设访问控制:实战指南
在Apache服务器上保护网站资源涉及启用访问控制模块(`mod_authz_core`和`mod_auth_basic`),在`.htaccess`或`httpd.conf`中设定权限,如限制对特定目录的访问。创建`.htpasswd`文件存储用户名和密码,并使用`htpasswd`工具管理用户。完成配置后重启Apache服务,访问受限目录时需提供有效的用户名和密码。对于高安全性需求,可考虑更复杂的认证方法。【6月更文挑战第20天】
298 4
|
5月前
|
弹性计算 应用服务中间件 Linux
双剑合璧:在同一ECS服务器上共存Apache与Nginx的实战攻略
在ECS服务器上同时部署Apache和Nginx的实战:安装更新系统,Ubuntu用`sudo apt install apache2 nginx`,CentOS用`sudo yum install httpd nginx`。配置Nginx作为反向代理,处理静态内容及转发动态请求到Apache(监听8080端口)。调整Apache的`ports.conf`监听8080。重启服务测试,实现两者高效协同,提升Web服务性能。记得根据流量和需求优化配置。【6月更文挑战第21天】
513 1
|
4月前
|
分布式计算 Apache Spark
|
5月前
|
存储 SQL 数据管理
基于阿里云数据库 SelectDB 版内核 Apache Doris 全新分区策略 Auto Partition 应用场景与功能详解
自动分区的出现进一步简化了复杂场景下的 DDL 和分区表的维护工作,许多用户已经使用该功能简化了工作流程,并且极大的便利了从其他数据库系统迁移到 Doris 的工作,自动分区已成为处理大规模数据和应对高并发场景的理想选择。
|
5月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
120 0

推荐镜像

更多