版本说明:
- flink-1.12.1
第一步:加载依赖与添加jar包
Maven dependency
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>
将flink-connector-jdbc_2.11-1.12.1.jar包移到/opt/modules/flink/lib目录下
flink-connector-jdbc_2.11-1.12.1.jar下载地址:
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.11/1.12.1
第二步:在mysql中创建表
create table person(user_id varchar(20), user_name varchar(20), age int); mysql> desc person; +-----------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-----------+-------------+------+-----+---------+-------+ | user_id | varchar(20) | YES | | NULL | | | user_name | varchar(20) | YES | | NULL | | | age | int(11) | YES | | NULL | | +-----------+-------------+------+-----+---------+-------+
第三步:测试Flink SQL与JDBC集成代码
package com.aikfk.flink.sql.jdbc; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; public class FlinkKafkaJDBC { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); String catalogName = "flink_hive"; String hiveDataBase = "flink"; String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf"; HiveCatalog hiveCatalog = new HiveCatalog(catalogName,hiveDataBase,hiveConfDir); tableEnvironment.registerCatalog(catalogName , hiveCatalog); tableEnvironment.useCatalog(catalogName); String kafkaTable = "kafka_person"; String kafkaDropsql = "DROP TABLE IF EXISTS kafka_person"; String kafakTable_sql = "CREATE TABLE kafka_person (\n" + " user_id String,\n" + " user_name String,\n" + " age Int\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'kfk',\n" + " 'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" + " 'format.type' = 'csv',\n" + " 'update-mode' = 'append'\n" + ")"; tableEnvironment.executeSql(kafkaDropsql); tableEnvironment.executeSql(kafakTable_sql); // register a MySQL table 'person' in Flink SQL String mysqlTable_sql = "CREATE TABLE mysql_person (\n" + " user_id String,\n" + " user_name String,\n" + " age INT\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://bigdata-pro-m07:3306/flink',\n" + " 'table-name' = 'person',\n" + " 'username' = 'root',\n" + " 'password' = '199911'\n" + ")"; String mysqlDropsql = "DROP TABLE IF EXISTS mysql_person"; tableEnvironment.executeSql(mysqlDropsql); tableEnvironment.executeSql(mysqlTable_sql); // write data into the JDBC table from the other table "kafka_person" tableEnvironment.executeSql("INSERT INTO mysql_person\n" + "SELECT user_id, user_name, age FROM kafka_person"); env.execute("kafka"); } }
通过flink sql client查看kafka_person、mysql_person表:
Flink SQL> show tables; kafka_person mysql_person person Flink SQL> desc kafka_person; +-----------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-----------+--------+------+-----+--------+-----------+ | user_id | STRING | true | | | | | user_name | STRING | true | | | | | age | INT | true | | | | +-----------+--------+------+-----+--------+-----------+ 3 rows in set
第四步:测试kafka数据源与mysql写入数据
创建生产者:
bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk
测试数据:
>100,alex,10 >100,alex,10 >100,alex,10 >100,alex,10 >100,alex,10 >100,alex,10
运行结果查看mysql中是否写入数据
mysql> select * from person; +---------+-----------+------+ | user_id | user_name | age | +---------+-----------+------+ | 100 | alex | 10 | | 100 | alex | 10 | | 100 | alex | 10 | | 100 | alex | 10 | | 100 | alex | 10 | +---------+-----------+------+ 5 rows in set (0.00 sec)
通过Flink SQL Client查看结果:
bin/sql-client.sh embedded
select * from kafka_person; select * from mysql_person;