1. 功能简介
从0.6.0版本开始,Hudi开始支持 commit 回调功能,即每当Hudi成功提交一次 commit, 其内部的回调服务就会向外部系统发出一条回调信息,用户可以根据该回调信息查询Hudi表的增量数据,并根据具体需求进行相应的业务处理。
1.1 支持的回调方式
当前 HoodieDeltaStreamer
可通过 HTTP
(默认) 和 Kafka
两种方式向外部发送回调信息,而 SparkDataSource
暂只支持 HTTP
一种。两种数据摄入方式在使用回调功能上没有区别(除了回调方式支持不同外),均通过参数配置实现。
配置相应参数后,启动的任务会在每次成功提交后像外部系统发送 Json
格式的回调信息,信息样例:
{"commitTime":"20201225152956","tableName":"callback_test","basePath":"file:///tmp/hudi_callback"}
1.2 参数配置
1.2.1 HTTP
HTTP
回调是默认方式,可通过配置下列参数启用.
必配参数:
## 是否开启回调功能,默认falsehoodie.write.commit.callback.on=true## 回调地址(必填)hoodie.write.commit.callback.http.url=http://ip:端口/callback
可选参数:
## 超时时间,默认三秒hoodie.write.commit.callback.http.timeout.seconds=xxx## api key 默认:hudi_write_commit_http_callbackhoodie.write.commit.callback.http.api.key=fake_api_key
Note: 回调接收服务需使用
callbackMsg
字段接收信息。
1.2.2 Kafka
Kafka
回调目前只支持 HoodieDeltaStreamer
,使用方式与 HTTP
类似.
必配参数:
## 是否开启回调功能,默认falsehoodie.write.commit.callback.on=true## 回调方式,使用Kafka实现类hoodie.write.commit.callback.class=org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback## Kafka serverhoodie.write.commit.callback.kafka.bootstrap.servers=xxx:9092## 回调 kafka 主题hoodie.write.commit.callback.kafka.topic=xxx_topic
可选参数:
## 输出写出的分区,默认 0hoodie.write.commit.callback.kafka.partition=分区数## ack 级别,默认 allhoodie.write.commit.callback.kafka.acks=all## 失败重试次数,默认 3hoodie.write.commit.callback.kafka.retries=重试次数
2. HTTP
方式使用示例
由于使用方式相同,这里方便起见我们使用HTTP
方式示例。
数据写入代码:
@Testdef insert(): Unit = { val inserts = convertToStringList(dataGen.generateInserts(100)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(HoodieWriteCommitCallbackConfig.CALLBACK_ON, "true"). option(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP, "http://localhost:8080/callback"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)}
Note:
tableName = "callback_test"basePath = "file:///tmp/hudi_callback"
接收回调信息的服务:
@RestControllerpublic class HoodieCallbackController { @RequestMapping("/callback") public void callback(@RequestBody String callbackMsg) { System.out.println(callbackMsg); }}
启动insert
方法,控制台打印出回调信息:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.4.1)2020-12-25 15:29:37.732 INFO 3860 --- [ main] c.h.h.HudiCallbackApplication : Starting HudiCallbackApplication using Java 1.8.0_211 on Mathieu with PID 3860 (/Users/wangxianghu/github/hudi-callback/target/classes started by wangxianghu in /Users/wangxianghu/github/hudi-callback)2020-12-25 15:29:37.735 INFO 3860 --- [ main] c.h.h.HudiCallbackApplication : No active profile set, falling back to default profiles: default2020-12-25 15:29:38.386 INFO 3860 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)2020-12-25 15:29:38.394 INFO 3860 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]2020-12-25 15:29:38.394 INFO 3860 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]2020-12-25 15:29:38.443 INFO 3860 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext2020-12-25 15:29:38.443 INFO 3860 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 674 ms2020-12-25 15:29:38.577 INFO 3860 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'2020-12-25 15:29:38.737 INFO 3860 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''2020-12-25 15:29:38.747 INFO 3860 --- [ main] c.h.h.HudiCallbackApplication : Started HudiCallbackApplication in 1.279 seconds (JVM running for 1.698)2020-12-25 15:29:59.893 INFO 3860 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'2020-12-25 15:29:59.894 INFO 3860 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'2020-12-25 15:29:59.894 INFO 3860 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms{"commitTime":"20201225152956","tableName":"callback_test","basePath":"file:///tmp/hudi_callback"}
3. 总结
本文简要介绍了Hudi支持的回调方式以及各种方式的详细配置,并以 HTTP
回调方式为例做了简要示范。