开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink如何读取hudi在changelog下的insert呢

Flink如何读取hudi在changelog下的insert呢

展开
收起
游客6vdkhpqtie2h2 2022-09-01 10:28:19 811 0
15 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    Flink可以通过读取Hudi表中的changelog文件来获取插入、更新和删除操作的信息。

    使用 Flink 流式计算进行 changelog 文件处理时,需要按照以下步骤进行:

    1. 构造 Hudi 表的 Schema 对象。

    读取 Hudi changelog 前,需要构造 Hudi 表的字段信息和Schema,可以使用:org.apache.hudi.keygen.SimpleKeyGenerator。

    例如,对下面的 Stripe 表,构造Schema:

    hudi-hive-sync-demo_hudi_trips_cow:
    {
    trip_id: long,
    ts: long,
    uuid: string,
    rider: string,
    driver: string,
    begin_lat: double,
    begin_lon: double,
    end_lat: double,
    end_lon: double,
    fare: double,
    partitionpath: string
    }
    

    代码示例:

    String tableSchema = "{"
            + "\"trip_id\": \"long\","
            + "\"ts\": \"long\","
            + "\"uuid\": \"string\","
            + "\"rider\": \"string\","
            + "\"driver\": \"string\","
            + "\"begin_lat\": \"double\","
            + "\"begin_lon\": \"double\","
            + "\"end_lat\": \"double\","
            + "\"end_lon\": \"double\","
            + "\"fare\": \"double\","
            + "\"partitionpath\": \"string\""
            + "}";
    Schema schema = new Schema.Parser().parse(tableSchema);
    
    1. 从 Hudi 表的 changelog 中获取插入、更新或删除的数据。

    可以像读取普通的 Flink 流数据一样,利用 Flink 的 FileInputFormat 读取 Hudi 表的 changelog,并将数据解析为 POJO。

    代码示例:

    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jdbcUrl, selectedTableName);
    String tablePath = metaClient.getBasePath();
    String changeLogFolder = tablePath + "/.hoodie/.temp/" + writeToken + "/changelog";
    // 构造 FileInputFormat
    ChangelogFileInputFormat source = new ChangelogFileInputFormat(
        new Path(changeLogFolder), schema, writeToken, operationTypes);
    //source.setNestedFileEnumeration(true);
    source.setFilesFilter(FilePathFilter.createDefaultFilter());
    

    在上述代码中,变量 writeToken 表示Hudi表的写入令牌、operationTypes 表示需要获取的操作类型,如 INSERT、UPSET、DELETE等。

    1. 在 Flink 中对数据进行处理。

    通过读取 Hudi 表的 changelog 文件,可以获取到插入、更新或删除的数据等信息,可以在 Flink 中对这些数据进行处理,例如:

    // 将读取到的每一条数据封装成 POJO
    public static class CdcRecord {
        public String operation;
        public String path;
        public long timestamp;
        public long id;
        public String uuid;
        public String rider;
        public String driver;
        public double beginLat;
        public double beginLon;
        public double endLat;
        public double endLon;
        public double fare;
        public String partitionPath;
        public CdcRecord(ObjectNode json) {
            operation = json.get("meta").get("action").asText();
            path = json.get("meta").get("partition").get("fileId").asText();
            timestamp = json.get("meta").get("timestamp").asLong();
            id = json.get("data").get("trip_id").asLong();
            uuid = json.get("data").get("uuid").asText();
            rider = json.get("data").get("rider").asText();
            driver = json.get("data").get("driver").asText();
            beginLat = json.get("data").get("begin_lat").asDouble();
            beginLon = json.get("data").get("begin_lon").asDouble();
            endLat = json.get("data").get("end_lat").asDouble();
            endLon = json.get("data").get("end_lon").asDouble();
            fare = json.get("data").get("fare").asDouble();
            partitionPath = json.get("partitionpath").asText();
        }
    }
    

    在 Hudi changelog 的消息中,可以获取到更多的信息,如 hudifile、commitTime、recordKey等。

    以上是使用 Flink 读取 Hudi changelog 的过程,最后可以将数据写入到 MySQL 或其他数据存储中。

    2023-05-06 11:55:17
    赞同 展开评论 打赏
  • Flink可以通过读取Hudi的changelog文件来捕获插入操作。Hudi的changelog文件包含每个操作类型(例如insert、update、delete等)的元数据,可以提供更可靠的数据同步。具体来说,您可以通过使用Hudi提供的HoodieChangelogStreamGenerator类生成changelog流,然后使用Flink提供的DataStream API读取和处理该流。以下是一个示例代码片段,展示了如何使用Hudi和Flink捕获changelog插入操作:

    val hudiTable = HoodieFlinkTable'file:/path/to/hudi/table',
            HoodieRealtimeTableFunction.createTypeInformation(),
            HoodieRealtimeTableFunction.createTypeInformation(),
            HoodieRealtimeTableFunction.createTypeInformation())
    
    val changelogStream = SparkRuntimeClientChangelogUtils.createSource(sparkSession, hudiTable)
    
    val insertStream = changelogStream
            .filter(changelog => changelog.getEventType == "insert")
            .map(insert => insert.getData)
    
    insertStream.print()
    

    该代码片段中,HoodieFlinkTable表示Hudi表,SparkRuntimeClientChangelogUtils用于生成changelog流,filter用于过滤出插入操作,map用于提取插入操作的数据,最后的print用于打印流中的记录。

    2023-05-05 17:33:05
    赞同 展开评论 打赏
  • 在 Flink 中读取 Hudi 的 Change Log 数据时,需要先将 Hudi 表以 hudi 格式注册到 Flink 的 Catalog 中,然后通过 SQL 查询的方式读取 Change Log,获取新插入的数据。

    具体操作步骤如下:

    1、将 Hudi 表以 hudi 格式注册到 Flink Catalog 中:

    -- 注册 Hudi 表到 Flink 的 Catalog 中
    CREATE TABLE hudi_table (
        __hoodie_commit_time STRING,
        __hoodie_commit_seqno STRING,
        __hoodie_record_key STRING,
        __hoodie_partition_path STRING,
        col1 STRING,
        col2 INT,
        ...
    ) PARTITIONED BY (__hoodie_partition_path)
    WITH (
        'connector' = 'hudi',
        'path' = '/path/to/hudi',
        'read.streaming.enabled' = 'true',
        'read.streaming.check-interval' = '1s',
        'tables.type' = 'COPY_ON_WRITE',
        'tables.root.partition.path' = '/',
        'tables.partition.fields' = '__hoodie_partition_path',
        'tables.partition.keys' = '__hoodie_record_key',
        'tables.metrics.reporters' = 'console'
    );
    

    在上述代码中,将 connector 设置为 hudi,并且使用 tables.partition.keys 指定了主键。同时,使用 read.streaming.enabled 和 read.streaming.check-interval 开启了流式查询,并设置了查询的时间间隔为 1s,以实时读取 Hudi Change Log 数据。

    2、使用 SQL 查询 Hudi Change Log 插入的新数据:

    -- 查询 Hudi Change Log 中新增的数据
    SELECT 
        __hoodie_commit_time,
        __hoodie_commit_seqno,
        __hoodie_record_key,
        __hoodie_partition_path,
        col1,
        col2,
        ...
    FROM hudi_table
    WHERE __hoodie_is_deleted = false 
    AND __hoodie_commit_time >= '2021-08-01 00:00:00' 
    AND __hoodie_commit_time < '2021-09-01 00:00:00'
    

    在上述代码中,使用 __hoodie_is_deleted 判断新增的数据,同时使用 __hoodie_commit_time 过滤查询时间范围内的数据。

    需要注意的是,Hudi Change Log 中的数据格式和 Flink 的数据格式不完全相同,因此在读取 Hudi Change Log 数据时,需要对数据进行转换和格式化,使其符合 Flink 的数据格式。

    2023-05-03 10:27:11
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    要读取Hudi在changelog下的insert操作,可以使用Flink的HudiInputFormat。HudiInputFormat是Flink提供的一种用于读取Hudi数据的输入格式,它可以读取Hudi 的数据文件和changelog文件,并将其转换为Flink的DataStream。 image.png

    2023-04-29 22:26:09
    赞同 展开评论 打赏
  • 在Hudi中,Changelog是用来记录数据变更的日志,它写入到对应的Hudi表的指定路径下的.hoodie目录中,Changelog文件名以_changelog结尾。Changelog中记录了Hudi表的所有变更操作,包括insert、update、delete等。

    如果需要在Flink中读取Hudi表Changelog中的insert操作,可以使用Flink的HoodieStreamTableSource类来实现。这个类可以将Hudi表作为输入流,可以读取Hudi表的Changelog并将其解析为Flink的数据流。

    以下是一个示例代码:

    String tablePath = "/path/to/hudi/table";
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // 创建 HoodieStreamTableSource
    HoodieStreamTableSource hoodieTableSource = HoodieStreamTableSource.builder()
        .path(tablePath)
        .recordKeyField("id")
        .build();
    
    // 读取 Changelog 中的 insert 操作
    DataStream&lt;Row&gt; stream = env.fromTableSource(hoodieTableSource)
        .filter(row -&gt; row.getField(2).equals("_I"));
    
    // 对于 insert 操作的处理逻辑
    // ...
    
    env.execute("Read Hudi Changelog insert operations");
    

    在这个代码中,首先使用HoodieStreamTableSource类创建一个Hudi表的输入源,设置表路径和记录键字段名称。然后使用fromTableSource方法将输入源转换为数据流,并使用filter方法筛选出Changelog中的insert操作。最后,可以在数据流中编写逻辑来处理insert操作。

    需要注意的是,HoodieStreamTableSource目前只能以流方式处理Changelog,如果需要读取历史版本或快照数据,可以参考其他方法,例如使用HoodieTableSource类或Flink的JDBCInputFormat

    2023-04-28 21:50:25
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    Flink 通过 Hudi 提供的 HoodieDeltaStreamer 工具可以读取 Hudi 在 changelog 下的 insert,并将其转化为流式数据处理。以下是步骤:

    在 Flink 程序中引入 Hudi 相关依赖,例如:
    

    org.apache.hudi hudi-flink_2.11 0.7.0

    在 Flink 程序中使用 HudiSourceFunction 创建一个 source,例如:
    

    HudiSourceConfig hudiSourceConfig = new HudiSourceConfig.Builder() .withTableName("

    ") .withChangelogEnabled(true) .withBootstrap(false) .build(); HudiSourceFunction source = new HudiSourceFunction(hudiSourceConfig);

    其中,

    是要读取的 Hudi 表名称。

    需要注意的是,withChangelogEnabled(true) 参数启用了 changelog 模式,用于读取插入和更新操作,withBootstrap(false) 设置为不使用 Bootstrap 模式。

    使用 Flink 的 DataStream API 处理 Hudi changelog 中的 insert 操作,例如:
    

    DataStream insertStream = env.addSource(source) .filter(new FilterFunction() { @Override public boolean filter(BaseRow value) throws Exception { // 过滤出 insert 操作 return value.getRowKind() == RowKind.INSERT; } }) .map(new MapFunction<BaseRow, String>() { @Override public String map(BaseRow value) throws Exception { // 将 BaseRow 转化为 String return value.toString(); } });

    使用 insertStream 进行下游处理,例如:
    

    insertStream.print();

    2023-04-27 12:51:20
    赞同 展开评论 打赏
  • 要读取Hudi在Changelog下的Insert,可以使用Flink对Hudi表进行查询操作。具体而言,可以通过Flink的Table API或SQL API来实现。

    在使用Table API时,需要完成以下步骤:

    创建一个Hudi表,并注册为Flink表

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // 创建Hudi表
    String tableName = "hudi_table";
    String path = "/path/to/hudi/table";
    String primaryKey = "id";
    String partitionColumn = "partition";
    String[] fieldNames = {"id", "name", "age"};
    TypeInformation[] fieldTypes = {Types.STRING(), Types.STRING(), Types.INT()};
    HoodieTableSource hoodieTableSource = HoodieTableSource.builder()
            .path(path)
            .tableName(tableName)
            .primaryKey(primaryKey)
            .partitionCols(Arrays.asList(partitionColumn))
            .fields(fieldNames, fieldTypes)
            .build();
    
    // 注册为Flink表
    tableEnv.registerTableSource(tableName, hoodieTableSource);
    

    在上述示例中,我们使用HoodieTableSource创建了一个Hudi表,并将其注册为Flink表供后续操作使用。

    使用Table API查询Hudi表中的Insert数据

               .filter("operation = 'INSERT'")  // 过滤出Insert操作
               .select($("id"), $("name"), $("age"));  // 查询特定字段
    
    // 输出结果
    DataStream<Row> dataStream = tableEnv.toAppendStream(result, Row.class);
    dataStream.print();
    

    在上述代码中,我们使用scan方法查询Hudi表中的数据,然后使用filter方法过滤出Insert操作的数据,并使用select方法查询特定字段。最后可以通过toAppendStream将结果转换为DataStream进行输出。

    在使用SQL API时,需要完成以下步骤:

    创建Hudi表,并注册为Flink表

    String path = "/path/to/hudi/table";
    String primaryKey = "id";
    String partitionColumn = "partition";
    String[] fieldNames = {"id", "name", "age"};
    TypeInformation[] fieldTypes = {Types.STRING(), Types.STRING(), Types.INT()};
    HoodieTableSource hoodieTableSource = HoodieTableSource.builder()
            .path(path)
            .tableName(tableName)
            .primaryKey(primaryKey)
            .partitionCols(Arrays.asList(partitionColumn))
            .fields(fieldNames, fieldTypes)
            .build();
    
    // 注册为Flink表
    tableEnv.registerTableSource(tableName, hoodieTableSource);
    使用SQL查询Hudi表中的Insert数据
    // 使用SQL查询Insert操作的数据
    String sqlQuery = "SELECT id, name, age FROM hudi_table WHERE operation='INSERT'";
    Table result = tableEnv.sqlQuery(sqlQuery);
    
    // 输出结果
    DataStream<Row> dataStream = tableEnv.toAppendStream(result, Row.class);
    dataStream.print();
    

    在上述代码中,我们使用SQL语句查询Hudi表中Insert操作的数据,并使用toAppendStream将结果转换为DataStream进行输出。

    需要注意的是,在查询Hudi表的Insert数据时,需要根据实际情况选择合适的字段和条件,以确保查询结果正确和高效。

    2023-04-26 11:00:10
    赞同 展开评论 打赏
  • 在 Flink 中读取 Hudi 的 Changelog 数据,需要使用 Flink 的 Hudi Connector。具体步骤如下:

    1. 在 Flink 中添加 Hudi Connector 的依赖,可以通过 Maven 或 Gradle 进行添加。

    2. 在 Flink 的程序中,使用 HudiSourceHudiInputFormat 类来读取 Hudi 的 Changelog 数据。这两个类的使用方式类似,下面以 HudiSource 为例进行说明。

    3. 在使用 HudiSource 时,需要指定 Hudi 的配置信息。可以通过 HudiConf 类来构建配置信息,例如:

    HudiConf conf = new HudiConf()
        .withPath("/path/to/hudi/table")
        .withSchema("hudi schema")
        .withPartitionFields("partition field")
        .withRecordKeyFields("record key field")
        .withKeyGeneratorClass(HoodieAvroKeyGenerator.class)
        .withReadSchema("avro schema")
        .withBasePath("/path/to/hudi/base")
        .withChangelogEnabled(true)
        .withChangelogMode(ChangelogMode.INCREMENTAL);
    

    其中,withPath 指定 Hudi 表的路径,withSchema 指定 Hudi 表的 Schema,withPartitionFields 指定 Hudi 表的分区字段,withRecordKeyFields 指定 Hudi 表的记录键字段,withKeyGeneratorClass 指定 Hudi 表的键生成器类,withReadSchema 指定 Hudi 表的读取 Schema,withBasePath 指定 Hudi 表的基本路径,withChangelogEnabled 指定是否启用 Changelog 模式,withChangelogMode 指定 Changelog 模式。

    1. 创建 HudiSource 对象,并将其作为数据源添加到 Flink 的 DataStream 中,例如:
    DataStream<Row> hudiStream = env.addSource(new HudiSource(conf));
    

    这样就可以读取 Hudi 的 Changelog 数据,并将其转换为 Flink 的 DataStream。 需要注意的是,在读取 Hudi 的 Changelog 数据时,需要保证 Hudi 表启用了 Changelog 模式,并且 Changelog 数据已经被写入到对应的目录中。如果 Hudi 表没有启用 Changelog 模式,或者 Changelog 数据还没有被写入到目录中,那么在读取时就无法获取到数据。

    2023-04-25 13:08:37
    赞同 展开评论 打赏
  • Flink可以通过Hudi提供的HoodieIncrementalInputFormat读取Hudi表的changelog,其中包括insert、update和delete等操作。

    2023-04-25 10:44:52
    赞同 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,其实Hudi集成Flink的读取方式分为:流读、增量读取、限流等三种方式,Flink读取Hudi下的insert,可通过Hudi的工具类HoodieFlinkStreamer来读取数据。

    2023-04-24 21:43:32
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    在 Flink 中读取 Hudi 的 Changelog 数据,可以使用 Hudi 提供的 HoodieChangelogStreamer 工具将 Changelog 数据导出到 Kafka 或 HBase 等支持 Flink 连接的数据源中,然后使用 Flink 的 FlinkKafkaConsumer 或 FlinkHBaseReader 等工具读取数据源中的数据。

    下面以使用 Kafka 作为数据源为例说明如何读取 Hudi 的 Changelog 数据:

    使用 HoodieChangelogStreamer 工具将 Changelog 数据导出到 Kafka 中:

    Copy code

    java -cp hudi-utilities-bundle.jar org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     --props /path/to/config/file.properties \
     --op UPSERT \
     --table-type MERGE_ON_READ \
     --source-class org.apache.hudi.DataSource \
     --source-ordering-field name \
     --target-base-path /path/to/hudi-table \
     --target-table hudi_table \
     --target-topic hudi_changelog_topic \
     --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
     --continuous
    
    

    其中 --target-topic 指定导出的 Changelog 数据要写入的 Kafka 主题。

    在 Flink 中使用 FlinkKafkaConsumer 读取 Kafka 中的数据:

    Copy code

    Properties kafkaProps = new Properties();
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProps.setProperty("group.id", "flink_consumer");
    
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("hudi_changelog_topic", new SimpleStringSchema(), kafkaProps);
    
    DataStream<String> changelogStream = env.addSource(consumer);
    
    // 对 Changelog 数据进行处理
    changelogStream.map(new MapFunction<String, MyData>() {
        @Override
        public MyData map(String value) throws Exception {
            // 解析 Changelog 数据并转换为 MyData 对象
            // ...
            return myData;
        }
    });
    
    

    在 FlinkKafkaConsumer 的构造函数中指定了要读取的 Kafka 主题名称,并使用 SimpleStringSchema 解析 Kafka 中的消息。然后可以对读取到的数据进行处理,例如解析 Changelog 数据并转换为自定义的数据类型。

    需要注意的是,由于 Hudi 的 Changelog 数据中包含了更新和删除操作,因此需要根据操作类型进行相应的处理,例如更新操作需要在 Flink 中执行相应的更新操作。

    2023-04-24 07:54:37
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    Flink可以通过Hudi提供的HoodieIncrementalInputFormat读取Hudi表的changelog,其中包括insert、update和delete等操作。

    具体步骤如下:

    在Flink中定义HoodieIncrementalInputFormat,并设置需要读取的Hudi表的相关配置信息,例如: HoodieIncrementalInputFormat inputFormat = new HoodieIncrementalInputFormat<>( new HoodieTableMetaClient.Builder().setBasePath("path/to/hudi/table").build(), TypeInformation.of(RowData.class), "commitTime", ConfigUtils.getDefaultHadoopConf() ); java 使用Flink的DataSource将HoodieIncrementalInputFormat转换为DataStream,例如: DataStream dataStream = env.createInput(inputFormat); java 对DataStream进行处理,例如: dataStream .filter(row -> row.getRowKind() == RowKind.INSERT) .map(row -> { // 处理insert操作的数据 return row; }); java 通过以上步骤,就可以读取Hudi表的changelog,并对其中的insert操作进行处理。

    2023-04-23 19:04:17
    赞同 展开评论 打赏
  • 热爱开发

    要读取 Hudi 在 changelog 模式下的 insert,需要使用 Flink 的 Hudi source 进行读取,Hudi source 可以根据 Hudi 数据集的类型自动选择对应的读取器。

    具体而言,可以按照以下步骤配置 Hudi source:

    导入相关依赖:在 Flink 项目中添加 Hudi 相关依赖,例如 flink-connector-hudi; 创建 Hudi source:使用 HoodieSource 类创建 Hudi source。该类需要传入三个参数:数据集类型(例如 COPY_ON_WRITE、MERGE_ON_READ)、数据集路径和表 schema; 配置 Hudi source:使用 HoodieSourceBuilder 类配置 Hudi source,设置必要的参数,例如 checkpoint 目录、读取起始位置等; 读取数据:使用 Flink 的 DataStream API 读取 Hudi 数据源。 示例代码如下:

    // 导入必要的依赖 import org.apache.hudi.connectors.flink.HoodieSource; import org.apache.hudi.connectors.flink.HoodieSourceBuilder; import org.apache.hudi.connectors.flink.util.SerializableConfiguration;

    // 创建 HoodieSource String basePath = "/path/to/hudi/dataset"; // Hudi 数据集路径 String tableName = "hudi_table"; // Hudi 表名 String datasetType = "MERGE_ON_READ"; // Hudi 数据集类型 SerializableConfiguration conf = new SerializableConfiguration(hadoopConf); // Hadoop 配置 String[] readCols = {"col1", "col2"}; // 读取列名 HoodieSource hoodieSource = new HoodieSourceBuilder() .basePath(basePath) .tableName(tableName) .datasetType(datasetType) .conf(conf.get()) .readCols(readCols) .build();

    // 配置 HoodieSource // 设置 checkpoint 目录和读取起始位置等参数 DataStream<HoodieRecord<? extends Serializable>> dataStream = env.addSource(hoodieSource);

    // 读取数据,进行下一步处理 需要注意的是,在读取 Hudi 数据时,Flink 会自动跟踪相关的元数据信息,并根据 Hudi 数据集类型选择相应的读取器。在读取过程中,Flink 还可以自动进行 Schema 的解析和匹配,并将 Hudi 数据转换为 Flink 内部的数据结构。

    2023-04-23 18:18:37
    赞同 展开评论 打赏
  • Flink 可以通过 Hudi 提供的工具类 HoodieFlinkStreamer 来读取 Hudi 中的数据,包括 Changelog、Snapshot 等。

    在 Flink 中使用 HoodieFlinkStreamer 读取 Changelog 的 Insert 事件可以参考以下代码:

    // 引入依赖
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink</artifactId>
        <version>${hudi.version}</version>
    </dependency>
    
    // 创建 HoodieFlinkStreamer 实例
    HoodieFlinkStreamer streamer = new HoodieFlinkStreamer();
    
    // 设置 Hoodie 数据集配置项
    Properties props = new Properties();
    props.put("hoodie.deltastreamer.source.dfs.root", "/path/to/source/data");
    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", "/path/to/source/schema/file");
    props.put("hoodie.deltastreamer.checkpoint.dir", "/path/to/checkpoint/dir");
    streamer.setProps(props);
    
    // 配置 Flink 数据集
    DataStream<HoodieRecord> hoodieStream = streamer
        .withEmbeddedTimelineServer()
        .fromFolder("/path/to/hudi/table")
        .withFileSystemViewConfig(HoodieFlinkFileSystemViewConfig.newBuilder().build())
        .pollIntervalMillis(1000)
        .buildInputFormat();
        
    // 过滤 Insert 事件
    Stream<HoodieRecord> insertStream = hoodieStream
        .filter(new FilterFunction<HoodieRecord>() {
            @Override
            public boolean filter(HoodieRecord hoodieRecord) throws Exception {
                return hoodieRecord.getCurrentLocation().getInstantTime().isAfter("20210621155603");
            }
        })
        .filter(new SerializableFunction<HoodieRecord, Boolean>() {
            @Override
            public Boolean apply(HoodieRecord hoodieRecord) {
                return hoodieRecord.getOperation().equals(HoodieOperation.INSERT);
            }
        });
    
    

    其中,HoodieRecord 是 Hudi 中记录的数据类型,HoodieOperation.INSERT 表示 Insert 操作。通过 filterSerializableFunction 可以过滤出指定时间范围内的 Insert 事件。

    2023-04-23 17:31:31
    赞同 展开评论 打赏
  • 存在即是合理

    Flink 读取 HBase 中的 insert 操作可以通过 TableEnvironment 和 Table 类来实现。

    首先,需要在 Flink 项目中添加 HBase 相关的依赖,例如 Apache HBase 和 Flink 的 TableEnvironment。

    然后,可以使用 TableEnvironment 类来创建一个 Flink 表环境,并使用 Table 类来读取 HBase 中的 insert 操作。

    2023-04-23 15:51:33
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载