引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
相关代码
object MysqlTokafka {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tableEnv
.executeSql(
"""
|create table mc_member_address_to_kafka(
| id bigint,
| user_id string,
| province string,
| city string,
| area string,
| address string,
| lon string,
| lat string,
| phone_number string,
| consignee_name string,
| gmt_create bigint,
| gmt_modified bigint,
| primary key (id) NOT ENFORCED
| ) WITH (
| 'connector' = 'upsert-kafka',
| 'topic' = 'mc_member_address_to_kafka',
| 'properties.bootstrap.servers' = 'cm2:9092,cm3:9092',
| 'key.format' = 'json',
| 'value.format' = 'json')
|""".stripMargin)
tableEnv.executeSql(
"""
|create table mc_member_address_to_kafka_source(
| id bigint,
| user_id string,
| province string,
| city string,
| area string,
| address string,
| lon string,
| lat string,
| phone_number string,
| consignee_name string,
| gmt_create bigint,
| gmt_modified bigint,
| primary key (id) NOT ENFORCED
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = 'mc_member_address_to_kafka',
| 'properties.bootstrap.servers' = 'cm2:9092,cm3:9092',
| 'properties.group.id' = 'testGroup',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json')
|""".stripMargin)
// 插入iceberg中 kafka -> iceberg
tableEnv.executeSql(
"""
| insert into lake_house_catalog.lakehouse.ods_iceberg_mc_member_address select * from mc_member_address_to_kafka
|""".stripMargin)
}
}