Flink如何读取hudi在changelog下的insert呢
Flink可以通过读取Hudi表中的changelog文件来获取插入、更新和删除操作的信息。
使用 Flink 流式计算进行 changelog 文件处理时,需要按照以下步骤进行:
读取 Hudi changelog 前,需要构造 Hudi 表的字段信息和Schema,可以使用:org.apache.hudi.keygen.SimpleKeyGenerator。
例如,对下面的 Stripe 表,构造Schema:
hudi-hive-sync-demo_hudi_trips_cow:
{
trip_id: long,
ts: long,
uuid: string,
rider: string,
driver: string,
begin_lat: double,
begin_lon: double,
end_lat: double,
end_lon: double,
fare: double,
partitionpath: string
}
代码示例:
String tableSchema = "{"
+ "\"trip_id\": \"long\","
+ "\"ts\": \"long\","
+ "\"uuid\": \"string\","
+ "\"rider\": \"string\","
+ "\"driver\": \"string\","
+ "\"begin_lat\": \"double\","
+ "\"begin_lon\": \"double\","
+ "\"end_lat\": \"double\","
+ "\"end_lon\": \"double\","
+ "\"fare\": \"double\","
+ "\"partitionpath\": \"string\""
+ "}";
Schema schema = new Schema.Parser().parse(tableSchema);
可以像读取普通的 Flink 流数据一样,利用 Flink 的 FileInputFormat 读取 Hudi 表的 changelog,并将数据解析为 POJO。
代码示例:
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jdbcUrl, selectedTableName);
String tablePath = metaClient.getBasePath();
String changeLogFolder = tablePath + "/.hoodie/.temp/" + writeToken + "/changelog";
// 构造 FileInputFormat
ChangelogFileInputFormat source = new ChangelogFileInputFormat(
new Path(changeLogFolder), schema, writeToken, operationTypes);
//source.setNestedFileEnumeration(true);
source.setFilesFilter(FilePathFilter.createDefaultFilter());
在上述代码中,变量 writeToken 表示Hudi表的写入令牌、operationTypes 表示需要获取的操作类型,如 INSERT、UPSET、DELETE等。
通过读取 Hudi 表的 changelog 文件,可以获取到插入、更新或删除的数据等信息,可以在 Flink 中对这些数据进行处理,例如:
// 将读取到的每一条数据封装成 POJO
public static class CdcRecord {
public String operation;
public String path;
public long timestamp;
public long id;
public String uuid;
public String rider;
public String driver;
public double beginLat;
public double beginLon;
public double endLat;
public double endLon;
public double fare;
public String partitionPath;
public CdcRecord(ObjectNode json) {
operation = json.get("meta").get("action").asText();
path = json.get("meta").get("partition").get("fileId").asText();
timestamp = json.get("meta").get("timestamp").asLong();
id = json.get("data").get("trip_id").asLong();
uuid = json.get("data").get("uuid").asText();
rider = json.get("data").get("rider").asText();
driver = json.get("data").get("driver").asText();
beginLat = json.get("data").get("begin_lat").asDouble();
beginLon = json.get("data").get("begin_lon").asDouble();
endLat = json.get("data").get("end_lat").asDouble();
endLon = json.get("data").get("end_lon").asDouble();
fare = json.get("data").get("fare").asDouble();
partitionPath = json.get("partitionpath").asText();
}
}
在 Hudi changelog 的消息中,可以获取到更多的信息,如 hudifile、commitTime、recordKey等。
以上是使用 Flink 读取 Hudi changelog 的过程,最后可以将数据写入到 MySQL 或其他数据存储中。
Flink可以通过读取Hudi的changelog文件来捕获插入操作。Hudi的changelog文件包含每个操作类型(例如insert、update、delete等)的元数据,可以提供更可靠的数据同步。具体来说,您可以通过使用Hudi提供的HoodieChangelogStreamGenerator类生成changelog流,然后使用Flink提供的DataStream API读取和处理该流。以下是一个示例代码片段,展示了如何使用Hudi和Flink捕获changelog插入操作:
val hudiTable = HoodieFlinkTable'file:/path/to/hudi/table',
HoodieRealtimeTableFunction.createTypeInformation(),
HoodieRealtimeTableFunction.createTypeInformation(),
HoodieRealtimeTableFunction.createTypeInformation())
val changelogStream = SparkRuntimeClientChangelogUtils.createSource(sparkSession, hudiTable)
val insertStream = changelogStream
.filter(changelog => changelog.getEventType == "insert")
.map(insert => insert.getData)
insertStream.print()
该代码片段中,HoodieFlinkTable
表示Hudi表,SparkRuntimeClientChangelogUtils
用于生成changelog流,filter
用于过滤出插入操作,map
用于提取插入操作的数据,最后的print
用于打印流中的记录。
在 Flink 中读取 Hudi 的 Change Log 数据时,需要先将 Hudi 表以 hudi 格式注册到 Flink 的 Catalog 中,然后通过 SQL 查询的方式读取 Change Log,获取新插入的数据。
具体操作步骤如下:
1、将 Hudi 表以 hudi 格式注册到 Flink Catalog 中:
-- 注册 Hudi 表到 Flink 的 Catalog 中
CREATE TABLE hudi_table (
__hoodie_commit_time STRING,
__hoodie_commit_seqno STRING,
__hoodie_record_key STRING,
__hoodie_partition_path STRING,
col1 STRING,
col2 INT,
...
) PARTITIONED BY (__hoodie_partition_path)
WITH (
'connector' = 'hudi',
'path' = '/path/to/hudi',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1s',
'tables.type' = 'COPY_ON_WRITE',
'tables.root.partition.path' = '/',
'tables.partition.fields' = '__hoodie_partition_path',
'tables.partition.keys' = '__hoodie_record_key',
'tables.metrics.reporters' = 'console'
);
在上述代码中,将 connector 设置为 hudi,并且使用 tables.partition.keys 指定了主键。同时,使用 read.streaming.enabled 和 read.streaming.check-interval 开启了流式查询,并设置了查询的时间间隔为 1s,以实时读取 Hudi Change Log 数据。
2、使用 SQL 查询 Hudi Change Log 插入的新数据:
-- 查询 Hudi Change Log 中新增的数据
SELECT
__hoodie_commit_time,
__hoodie_commit_seqno,
__hoodie_record_key,
__hoodie_partition_path,
col1,
col2,
...
FROM hudi_table
WHERE __hoodie_is_deleted = false
AND __hoodie_commit_time >= '2021-08-01 00:00:00'
AND __hoodie_commit_time < '2021-09-01 00:00:00'
在上述代码中,使用 __hoodie_is_deleted 判断新增的数据,同时使用 __hoodie_commit_time 过滤查询时间范围内的数据。
需要注意的是,Hudi Change Log 中的数据格式和 Flink 的数据格式不完全相同,因此在读取 Hudi Change Log 数据时,需要对数据进行转换和格式化,使其符合 Flink 的数据格式。
要读取Hudi在changelog下的insert操作,可以使用Flink的HudiInputFormat。HudiInputFormat是Flink提供的一种用于读取Hudi数据的输入格式,它可以读取Hudi 的数据文件和changelog文件,并将其转换为Flink的DataStream。
在Hudi中,Changelog是用来记录数据变更的日志,它写入到对应的Hudi表的指定路径下的.hoodie
目录中,Changelog文件名以_changelog
结尾。Changelog中记录了Hudi表的所有变更操作,包括insert、update、delete等。
如果需要在Flink中读取Hudi表Changelog中的insert操作,可以使用Flink的HoodieStreamTableSource
类来实现。这个类可以将Hudi表作为输入流,可以读取Hudi表的Changelog并将其解析为Flink的数据流。
以下是一个示例代码:
String tablePath = "/path/to/hudi/table";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 HoodieStreamTableSource
HoodieStreamTableSource hoodieTableSource = HoodieStreamTableSource.builder()
.path(tablePath)
.recordKeyField("id")
.build();
// 读取 Changelog 中的 insert 操作
DataStream<Row> stream = env.fromTableSource(hoodieTableSource)
.filter(row -> row.getField(2).equals("_I"));
// 对于 insert 操作的处理逻辑
// ...
env.execute("Read Hudi Changelog insert operations");
在这个代码中,首先使用HoodieStreamTableSource
类创建一个Hudi表的输入源,设置表路径和记录键字段名称。然后使用fromTableSource
方法将输入源转换为数据流,并使用filter
方法筛选出Changelog中的insert操作。最后,可以在数据流中编写逻辑来处理insert操作。
需要注意的是,HoodieStreamTableSource
目前只能以流方式处理Changelog,如果需要读取历史版本或快照数据,可以参考其他方法,例如使用HoodieTableSource
类或Flink的JDBCInputFormat
。
Flink 通过 Hudi 提供的 HoodieDeltaStreamer 工具可以读取 Hudi 在 changelog 下的 insert,并将其转化为流式数据处理。以下是步骤:
在 Flink 程序中引入 Hudi 相关依赖,例如:
org.apache.hudi hudi-flink_2.11 0.7.0
在 Flink 程序中使用 HudiSourceFunction 创建一个 source,例如:
HudiSourceConfig hudiSourceConfig = new HudiSourceConfig.Builder() .withTableName("
其中,
需要注意的是,withChangelogEnabled(true) 参数启用了 changelog 模式,用于读取插入和更新操作,withBootstrap(false) 设置为不使用 Bootstrap 模式。
使用 Flink 的 DataStream API 处理 Hudi changelog 中的 insert 操作,例如:
DataStream insertStream = env.addSource(source) .filter(new FilterFunction() { @Override public boolean filter(BaseRow value) throws Exception { // 过滤出 insert 操作 return value.getRowKind() == RowKind.INSERT; } }) .map(new MapFunction<BaseRow, String>() { @Override public String map(BaseRow value) throws Exception { // 将 BaseRow 转化为 String return value.toString(); } });
使用 insertStream 进行下游处理,例如:
insertStream.print();
要读取Hudi在Changelog下的Insert,可以使用Flink对Hudi表进行查询操作。具体而言,可以通过Flink的Table API或SQL API来实现。
在使用Table API时,需要完成以下步骤:
创建一个Hudi表,并注册为Flink表
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建Hudi表
String tableName = "hudi_table";
String path = "/path/to/hudi/table";
String primaryKey = "id";
String partitionColumn = "partition";
String[] fieldNames = {"id", "name", "age"};
TypeInformation[] fieldTypes = {Types.STRING(), Types.STRING(), Types.INT()};
HoodieTableSource hoodieTableSource = HoodieTableSource.builder()
.path(path)
.tableName(tableName)
.primaryKey(primaryKey)
.partitionCols(Arrays.asList(partitionColumn))
.fields(fieldNames, fieldTypes)
.build();
// 注册为Flink表
tableEnv.registerTableSource(tableName, hoodieTableSource);
在上述示例中,我们使用HoodieTableSource创建了一个Hudi表,并将其注册为Flink表供后续操作使用。
使用Table API查询Hudi表中的Insert数据
.filter("operation = 'INSERT'") // 过滤出Insert操作
.select($("id"), $("name"), $("age")); // 查询特定字段
// 输出结果
DataStream<Row> dataStream = tableEnv.toAppendStream(result, Row.class);
dataStream.print();
在上述代码中,我们使用scan方法查询Hudi表中的数据,然后使用filter方法过滤出Insert操作的数据,并使用select方法查询特定字段。最后可以通过toAppendStream将结果转换为DataStream进行输出。
在使用SQL API时,需要完成以下步骤:
创建Hudi表,并注册为Flink表
String path = "/path/to/hudi/table";
String primaryKey = "id";
String partitionColumn = "partition";
String[] fieldNames = {"id", "name", "age"};
TypeInformation[] fieldTypes = {Types.STRING(), Types.STRING(), Types.INT()};
HoodieTableSource hoodieTableSource = HoodieTableSource.builder()
.path(path)
.tableName(tableName)
.primaryKey(primaryKey)
.partitionCols(Arrays.asList(partitionColumn))
.fields(fieldNames, fieldTypes)
.build();
// 注册为Flink表
tableEnv.registerTableSource(tableName, hoodieTableSource);
使用SQL查询Hudi表中的Insert数据
// 使用SQL查询Insert操作的数据
String sqlQuery = "SELECT id, name, age FROM hudi_table WHERE operation='INSERT'";
Table result = tableEnv.sqlQuery(sqlQuery);
// 输出结果
DataStream<Row> dataStream = tableEnv.toAppendStream(result, Row.class);
dataStream.print();
在上述代码中,我们使用SQL语句查询Hudi表中Insert操作的数据,并使用toAppendStream将结果转换为DataStream进行输出。
需要注意的是,在查询Hudi表的Insert数据时,需要根据实际情况选择合适的字段和条件,以确保查询结果正确和高效。
在 Flink 中读取 Hudi 的 Changelog 数据,需要使用 Flink 的 Hudi Connector。具体步骤如下:
在 Flink 中添加 Hudi Connector 的依赖,可以通过 Maven 或 Gradle 进行添加。
在 Flink 的程序中,使用 HudiSource
或 HudiInputFormat
类来读取 Hudi 的 Changelog 数据。这两个类的使用方式类似,下面以 HudiSource
为例进行说明。
在使用 HudiSource
时,需要指定 Hudi 的配置信息。可以通过 HudiConf
类来构建配置信息,例如:
HudiConf conf = new HudiConf()
.withPath("/path/to/hudi/table")
.withSchema("hudi schema")
.withPartitionFields("partition field")
.withRecordKeyFields("record key field")
.withKeyGeneratorClass(HoodieAvroKeyGenerator.class)
.withReadSchema("avro schema")
.withBasePath("/path/to/hudi/base")
.withChangelogEnabled(true)
.withChangelogMode(ChangelogMode.INCREMENTAL);
其中,withPath
指定 Hudi 表的路径,withSchema
指定 Hudi 表的 Schema,withPartitionFields
指定 Hudi 表的分区字段,withRecordKeyFields
指定 Hudi 表的记录键字段,withKeyGeneratorClass
指定 Hudi 表的键生成器类,withReadSchema
指定 Hudi 表的读取 Schema,withBasePath
指定 Hudi 表的基本路径,withChangelogEnabled
指定是否启用 Changelog 模式,withChangelogMode
指定 Changelog 模式。
HudiSource
对象,并将其作为数据源添加到 Flink 的 DataStream 中,例如:DataStream<Row> hudiStream = env.addSource(new HudiSource(conf));
这样就可以读取 Hudi 的 Changelog 数据,并将其转换为 Flink 的 DataStream。 需要注意的是,在读取 Hudi 的 Changelog 数据时,需要保证 Hudi 表启用了 Changelog 模式,并且 Changelog 数据已经被写入到对应的目录中。如果 Hudi 表没有启用 Changelog 模式,或者 Changelog 数据还没有被写入到目录中,那么在读取时就无法获取到数据。
Flink可以通过Hudi提供的HoodieIncrementalInputFormat读取Hudi表的changelog,其中包括insert、update和delete等操作。
楼主你好,其实Hudi集成Flink的读取方式分为:流读、增量读取、限流等三种方式,Flink读取Hudi下的insert,可通过Hudi的工具类HoodieFlinkStreamer来读取数据。
在 Flink 中读取 Hudi 的 Changelog 数据,可以使用 Hudi 提供的 HoodieChangelogStreamer 工具将 Changelog 数据导出到 Kafka 或 HBase 等支持 Flink 连接的数据源中,然后使用 Flink 的 FlinkKafkaConsumer 或 FlinkHBaseReader 等工具读取数据源中的数据。
下面以使用 Kafka 作为数据源为例说明如何读取 Hudi 的 Changelog 数据:
使用 HoodieChangelogStreamer 工具将 Changelog 数据导出到 Kafka 中:
Copy code
java -cp hudi-utilities-bundle.jar org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--props /path/to/config/file.properties \
--op UPSERT \
--table-type MERGE_ON_READ \
--source-class org.apache.hudi.DataSource \
--source-ordering-field name \
--target-base-path /path/to/hudi-table \
--target-table hudi_table \
--target-topic hudi_changelog_topic \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--continuous
其中 --target-topic 指定导出的 Changelog 数据要写入的 Kafka 主题。
在 Flink 中使用 FlinkKafkaConsumer 读取 Kafka 中的数据:
Copy code
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("hudi_changelog_topic", new SimpleStringSchema(), kafkaProps);
DataStream<String> changelogStream = env.addSource(consumer);
// 对 Changelog 数据进行处理
changelogStream.map(new MapFunction<String, MyData>() {
@Override
public MyData map(String value) throws Exception {
// 解析 Changelog 数据并转换为 MyData 对象
// ...
return myData;
}
});
在 FlinkKafkaConsumer 的构造函数中指定了要读取的 Kafka 主题名称,并使用 SimpleStringSchema 解析 Kafka 中的消息。然后可以对读取到的数据进行处理,例如解析 Changelog 数据并转换为自定义的数据类型。
需要注意的是,由于 Hudi 的 Changelog 数据中包含了更新和删除操作,因此需要根据操作类型进行相应的处理,例如更新操作需要在 Flink 中执行相应的更新操作。
Flink可以通过Hudi提供的HoodieIncrementalInputFormat读取Hudi表的changelog,其中包括insert、update和delete等操作。
具体步骤如下:
在Flink中定义HoodieIncrementalInputFormat,并设置需要读取的Hudi表的相关配置信息,例如: HoodieIncrementalInputFormat inputFormat = new HoodieIncrementalInputFormat<>( new HoodieTableMetaClient.Builder().setBasePath("path/to/hudi/table").build(), TypeInformation.of(RowData.class), "commitTime", ConfigUtils.getDefaultHadoopConf() ); java 使用Flink的DataSource将HoodieIncrementalInputFormat转换为DataStream,例如: DataStream dataStream = env.createInput(inputFormat); java 对DataStream进行处理,例如: dataStream .filter(row -> row.getRowKind() == RowKind.INSERT) .map(row -> { // 处理insert操作的数据 return row; }); java 通过以上步骤,就可以读取Hudi表的changelog,并对其中的insert操作进行处理。
要读取 Hudi 在 changelog 模式下的 insert,需要使用 Flink 的 Hudi source 进行读取,Hudi source 可以根据 Hudi 数据集的类型自动选择对应的读取器。
具体而言,可以按照以下步骤配置 Hudi source:
导入相关依赖:在 Flink 项目中添加 Hudi 相关依赖,例如 flink-connector-hudi; 创建 Hudi source:使用 HoodieSource 类创建 Hudi source。该类需要传入三个参数:数据集类型(例如 COPY_ON_WRITE、MERGE_ON_READ)、数据集路径和表 schema; 配置 Hudi source:使用 HoodieSourceBuilder 类配置 Hudi source,设置必要的参数,例如 checkpoint 目录、读取起始位置等; 读取数据:使用 Flink 的 DataStream API 读取 Hudi 数据源。 示例代码如下:
// 导入必要的依赖 import org.apache.hudi.connectors.flink.HoodieSource; import org.apache.hudi.connectors.flink.HoodieSourceBuilder; import org.apache.hudi.connectors.flink.util.SerializableConfiguration;
// 创建 HoodieSource String basePath = "/path/to/hudi/dataset"; // Hudi 数据集路径 String tableName = "hudi_table"; // Hudi 表名 String datasetType = "MERGE_ON_READ"; // Hudi 数据集类型 SerializableConfiguration conf = new SerializableConfiguration(hadoopConf); // Hadoop 配置 String[] readCols = {"col1", "col2"}; // 读取列名 HoodieSource hoodieSource = new HoodieSourceBuilder() .basePath(basePath) .tableName(tableName) .datasetType(datasetType) .conf(conf.get()) .readCols(readCols) .build();
// 配置 HoodieSource // 设置 checkpoint 目录和读取起始位置等参数 DataStream<HoodieRecord<? extends Serializable>> dataStream = env.addSource(hoodieSource);
// 读取数据,进行下一步处理 需要注意的是,在读取 Hudi 数据时,Flink 会自动跟踪相关的元数据信息,并根据 Hudi 数据集类型选择相应的读取器。在读取过程中,Flink 还可以自动进行 Schema 的解析和匹配,并将 Hudi 数据转换为 Flink 内部的数据结构。
Flink 可以通过 Hudi 提供的工具类 HoodieFlinkStreamer
来读取 Hudi 中的数据,包括 Changelog、Snapshot 等。
在 Flink 中使用 HoodieFlinkStreamer
读取 Changelog 的 Insert 事件可以参考以下代码:
// 引入依赖
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink</artifactId>
<version>${hudi.version}</version>
</dependency>
// 创建 HoodieFlinkStreamer 实例
HoodieFlinkStreamer streamer = new HoodieFlinkStreamer();
// 设置 Hoodie 数据集配置项
Properties props = new Properties();
props.put("hoodie.deltastreamer.source.dfs.root", "/path/to/source/data");
props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", "/path/to/source/schema/file");
props.put("hoodie.deltastreamer.checkpoint.dir", "/path/to/checkpoint/dir");
streamer.setProps(props);
// 配置 Flink 数据集
DataStream<HoodieRecord> hoodieStream = streamer
.withEmbeddedTimelineServer()
.fromFolder("/path/to/hudi/table")
.withFileSystemViewConfig(HoodieFlinkFileSystemViewConfig.newBuilder().build())
.pollIntervalMillis(1000)
.buildInputFormat();
// 过滤 Insert 事件
Stream<HoodieRecord> insertStream = hoodieStream
.filter(new FilterFunction<HoodieRecord>() {
@Override
public boolean filter(HoodieRecord hoodieRecord) throws Exception {
return hoodieRecord.getCurrentLocation().getInstantTime().isAfter("20210621155603");
}
})
.filter(new SerializableFunction<HoodieRecord, Boolean>() {
@Override
public Boolean apply(HoodieRecord hoodieRecord) {
return hoodieRecord.getOperation().equals(HoodieOperation.INSERT);
}
});
其中,HoodieRecord
是 Hudi 中记录的数据类型,HoodieOperation.INSERT
表示 Insert 操作。通过 filter
和 SerializableFunction
可以过滤出指定时间范围内的 Insert 事件。
Flink 读取 HBase 中的 insert 操作可以通过 TableEnvironment 和 Table 类来实现。
首先,需要在 Flink 项目中添加 HBase 相关的依赖,例如 Apache HBase 和 Flink 的 TableEnvironment。
然后,可以使用 TableEnvironment 类来创建一个 Flink 表环境,并使用 Table 类来读取 HBase 中的 insert 操作。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。