flink datastream api读取hudi数据
1:pom依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.15-bundle</artifactId> <version>${hudi.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
2:datastream api读取hudi demo
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class FlinkDataStreamReadFromHudiTest { public static void main(String[] args) throws Exception { // 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建Hudi数据流 String studentHudiTable = "ods_student_table"; String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable; Map<String, String> studentOptions = new HashMap<>(); studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath); studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); studentOptions.put(FlinkOptions.READ_AS_STREAMING.key(), "true");// this option enable the streaming read studentOptions.put(FlinkOptions.READ_START_COMMIT.key(), "16811748000000");// specifies the start commit instant time studentOptions.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "4");// studentOptions.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");// HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable) .column("id BIGINT") .column("name STRING") .column("age BIGINT") .column("hobby STRING") .pk("id") .options(studentOptions); DataStream<RowData> studentRowDataDS = studentBuilder.source(env); // 3. 数据转换与输出 DataStream<Student> studentDS = studentRowDataDS.map(new MapFunction<RowData, Student>() { @Override public Student map(RowData value) throws Exception { try { String rowKind = value.getRowKind().name(); Long id = value.getLong(0); String name = value.getString(1).toString(); Long age = value.getLong(2); String hobby = value.getString(3).toString(); Student student = new Student(id, name, age, hobby, rowKind); return student; } catch (Exception e) { e.printStackTrace(); return null; } } }); studentDS.print(); env.execute("FlinkDataStreamReadFromHudiTest"); } public static class Student{ public Long id; public String name; public Long age; public String hobby; public String rowKind; public Student() { } public Student(Long id, String name, Long age, String hobby, String rowKind) { this.id = id; this.name = name; this.age = age; this.hobby = hobby; this.rowKind = rowKind; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Long getAge() { return age; } public void setAge(Long age) { this.age = age; } public String getHobby() { return hobby; } public void setHobby(String hobby) { this.hobby = hobby; } public String getRowKind() { return rowKind; } public void setRowKind(String rowKind) { this.rowKind = rowKind; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", hobby='" + hobby + '\'' + ", rowKind='" + rowKind + '\'' + '}'; } } }
3:datastream api写hudi demo
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class FlinkDataStreamWrite2HudiTest { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录 String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest"; StateBackend backend = new EmbeddedRocksDBStateBackend(true); env.setStateBackend(backend); CheckpointConfig conf = env.getCheckpointConfig(); // 任务流取消和故障应保留检查点 conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); conf.setCheckpointInterval(1000);//milliseconds conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔 conf.setCheckpointStorage(checkPointPath); // 3.准备数据 DataStreamSource<Student> studentDS = env.fromElements( new Student(101L, "Johnson", 17L, "swimming"), new Student(102L, "Lin", 15L, "shopping"), new Student(103L, "Tom", 5L, "play")); // 4.创建Hudi数据流 // 4.1 Hudi表名和路径 String studentHudiTable = "ods_student_table"; String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable; Map<String, String> studentOptions = new HashMap<>(); studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath); studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable) .column("id BIGINT") .column("name STRING") .column("age BIGINT") .column("hobby STRING") .pk("id") // .pk("id,age")// 可以设置联合主键,用逗号分隔 .options(studentOptions); // 5.转成RowData流 DataStream<RowData> studentRowDataDS = studentDS.map(new MapFunction<Student, RowData>() { @Override public RowData map(Student value) throws Exception { try { Long id = value.id; String name = value.name; Long age = value.age; String hobby = value.hobby; GenericRowData row = new GenericRowData(4); row.setField(0, Long.valueOf(id)); row.setField(1, StringData.fromString(name)); row.setField(2, Long.valueOf(age)); row.setField(3, StringData.fromString(hobby)); return row; } catch (Exception e) { e.printStackTrace(); return null; } } }); studentBuilder.sink(studentRowDataDS, false); env.execute("FlinkDataStreamWrite2HudiTest"); } public static class Student{ public Long id; public String name; public Long age; public String hobby; public Student() { } public Student(Long id, String name, Long age, String hobby) { this.id = id; this.name = name; this.age = age; this.hobby = hobby; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Long getAge() { return age; } public void setAge(Long age) { this.age = age; } public String getHobby() { return hobby; } public void setHobby(String hobby) { this.hobby = hobby; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", hobby='" + hobby + '\'' + '}'; } } }
flink table api读取hudi数据
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkSqlReadFromHudiTest { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); // 2.准备Hudi表的数据流,并从Hudi表读取数据 tabEnv.executeSql("" + "CREATE TABLE out_ods_student_table(\n" + " id BIGINT COMMENT '学号',\n" + " name STRING\t COMMENT '姓名',\n" + " age BIGINT COMMENT '年龄',\n" + " hobby STRING COMMENT '爱好',\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ")\n" + "WITH(\n" + " 'connector' = 'hudi',\n" + " 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" + " 'table.type' = 'MERGE_ON_READ',\n" + " 'compaction.async.enabled' = 'true',\n" + " 'compaction.tasks' = '1',\n" + " 'compaction.trigger.strategy' = 'num_commits',\n" + " 'compaction.delta_commits' = '3',\n" + " 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" + " 'hoodie.cleaner.commits.retained'='30',\n" + " 'hoodie.keep.min.commits'='35' ,\n" + " 'hoodie.keep.max.commits'='40'\n" + ")"); tabEnv.executeSql("select id,name,age,hobby from out_ods_student_table").print(); env.execute("FlinkSqlReadFromHudiTest"); } }
flink table api写入hudi数据,(数据来源于datastream)
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkDataStreamSqlWrite2HudiTest { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); // 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录 String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest"; StateBackend backend = new EmbeddedRocksDBStateBackend(true); env.setStateBackend(backend); CheckpointConfig conf = env.getCheckpointConfig(); // 任务流取消和故障应保留检查点 conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); conf.setCheckpointInterval(1000);//milliseconds conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔 conf.setCheckpointStorage(checkPointPath); // 3.准备数据,真实环境中,这里可以替换成从kafka读取数据 DataStreamSource<Student> studentDS = env.fromElements( new Student(201L, "zhangsan", 117L, "eat"), new Student(202L, "lisi", 115L, "drink"), new Student(203L, "wangwu", 105L, "sleep")); // 由于后续没有DataStream的执行算子,可以会报错: // Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. // 不过不影响数据写入Hudi // 当然,也可以加一步DataStream的执行算子,比如 print // studentDS.print("DataStream: "); // 4.通过DataStream创建表 // 4.1 第一个参数:表名;第二个参数:DataStream;第三个可选参数:指定列名,可以指定DataStream中的元素名和列名的匹配关系,比如 "userId as user_id, name, age, hobby" tabEnv.registerDataStream("tmp_student_table", studentDS, "id, name, age, hobby"); // 5.准备Hudi表的数据流,并将数据写入Hudi表 tabEnv.executeSql("" + "CREATE TABLE out_ods_student_table(\n" + " id BIGINT COMMENT '学号',\n" + " name STRING\t COMMENT '姓名',\n" + " age BIGINT COMMENT '年龄',\n" + " hobby STRING COMMENT '爱好',\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ")\n" + "WITH(\n" + " 'connector' = 'hudi',\n" + " 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" + " 'table.type' = 'MERGE_ON_READ',\n" + " 'compaction.async.enabled' = 'true',\n" + " 'compaction.tasks' = '1',\n" + " 'compaction.trigger.strategy' = 'num_commits',\n" + " 'compaction.delta_commits' = '3',\n" + " 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" + " 'hoodie.cleaner.commits.retained'='30',\n" + " 'hoodie.keep.min.commits'='35' ,\n" + " 'hoodie.keep.max.commits'='40'\n" + ")"); tabEnv.executeSql("insert into out_ods_student_table select id,name,age,hobby from tmp_student_table"); env.execute("FlinkDataStreamSqlWrite2HudiTest"); } public static class Student{ public Long id; public String name; public Long age; public String hobby; public Student() { } public Student(Long id, String name, Long age, String hobby) { this.id = id; this.name = name; this.age = age; this.hobby = hobby; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Long getAge() { return age; } public void setAge(Long age) { this.age = age; } public String getHobby() { return hobby; } public void setHobby(String hobby) { this.hobby = hobby; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", hobby='" + hobby + '\'' + '}'; } } }
flink table api写入hudi数据,(数据来源于table表)
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkValuesSqlWrite2HudiTest { public static void main(String[] args) throws Exception { // 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); // 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录 String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest"; StateBackend backend = new EmbeddedRocksDBStateBackend(true); env.setStateBackend(backend); CheckpointConfig conf = env.getCheckpointConfig(); // 任务流取消和故障应保留检查点 conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); conf.setCheckpointInterval(1000);//milliseconds conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔 conf.setCheckpointStorage(checkPointPath); // 3.准备Hudi表的数据流,并将数据写入Hudi表 tabEnv.executeSql("" + "CREATE TABLE out_ods_student_table(\n" + " id BIGINT COMMENT '学号',\n" + " name STRING\t COMMENT '姓名',\n" + " age BIGINT COMMENT '年龄',\n" + " hobby STRING COMMENT '爱好',\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ")\n" + "WITH(\n" + " 'connector' = 'hudi',\n" + " 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" + " 'table.type' = 'MERGE_ON_READ',\n" + " 'compaction.async.enabled' = 'true',\n" + " 'compaction.tasks' = '1',\n" + " 'compaction.trigger.strategy' = 'num_commits',\n" + " 'compaction.delta_commits' = '3',\n" + " 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" + " 'hoodie.cleaner.commits.retained'='30',\n" + " 'hoodie.keep.min.commits'='35' ,\n" + " 'hoodie.keep.max.commits'='40'\n" + ")"); tabEnv.executeSql("" + "insert into out_ods_student_table values\n" + " (301, 'xiaoming', 201, 'read'),\n" + " (302, 'xiaohong', 202, 'write'),\n" + " (303, 'xiaogang', 203, 'sing')"); env.execute("FlinkValuesSqlWrite2HudiTest"); } }