pom文件配置
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.14</artifactId>
<version>0.13.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
代码集
object IcebergRead {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000)
val tblEnv = StreamTableEnvironment.create(env)
//tblEnv.useDatabase("iceberg_db1")
tblEnv.executeSql(
"""
| create table flink_iceberg_tbl2(
| id int,
| name string,
| age int,
| loc string)
| partitioned by (loc)
| WITH (
| 'connector'='iceberg',
| 'catalog-name'='hadoop_zmd',
| 'catalog-database'='iceberg_db1',
| 'catalog-type'='hadoop',
| 'warehouse'='hdfs://cm1:8020/testzmd'
|)
|""".stripMargin)
tblEnv.executeSql(
"""
| insert into flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')
|""".stripMargin)
tblEnv.executeSql(
"""
| select * from flink_iceberg_tbl2
|""".stripMargin).print()
//env.execute()
}
}