版本说明:
- flink-1.12.1
- kafka_2.12-2.4.1
(1)Flink Stream与Kafka的集成
添加maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.12.1</version> </dependency>
测试Kafka Consumer:Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。
package com.aikfk.flink.sql.kafka; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Properties; /** * @author :caizhengjie * @description:TODO * @date :2021/4/10 12:53 下午 */ public class FlinkConnectKafka { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建TableEnvironment(Blink planner) EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); // Kafka Consumer Properties properties = new Properties(); properties.setProperty("bootstrap.servers","bigdata-pro-m07:9092"); // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.setProperty("group.id","kfk"); // kafka数据源 DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String> ("kfk", new SimpleStringSchema(), properties)); dataStream.map(new MapFunction<String, Tuple2<String,String>>() { @Override public Tuple2<String,String> map(String line) throws Exception { String[] words = line.split(","); return new Tuple2<>(words[0],words[1]); } }).print(); env.execute("kafka"); } }
创建topic:
bin/kafka-topics.sh --bootstrap-server bigdata-pro-m07:9092 --create --topic kfk --partitions 1 --replication-factor 1
创建生产者:
bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk
测试数据:
1> (hibve,dsd)
关于flink stream与kafka的更多集成见官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html
(2)Flink SQL与Kafka的集成
在/opt/modules/flink/lib目录下添加jar包
flink-sql-connector-kafka_2.11-1.12.1.jar
下载地址:
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.12.1
(2.1)第一步:Flink SQL Client配置好
见文章:https://blog.csdn.net/weixin_45366499/article/details/115576853
(2.2)第二步:创建hiveConfDir
将hive-site.xml放在/Users/caizhengjie/Desktop/hive-conf目录下
(2.3)第三步:测试Flink SQL与Kafka集成的代码
package com.aikfk.flink.sql.kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.table.catalog.hive.HiveCatalog; /** * @author :caizhengjie * @description:TODO * @date :2021/4/10 12:53 下午 */ public class FlinkConnectKafkaDDL { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建TableEnvironment(Blink planner) EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); String catalogName = "flink_hive"; String hiveDataBase = "flink"; String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf"; // Catalog HiveCatalog hiveCatalog = new HiveCatalog(catalogName,hiveDataBase,hiveConfDir); tableEnvironment.registerCatalog(catalogName , hiveCatalog); tableEnvironment.useCatalog(catalogName); // DDL,根据kafka数据源创建表 String kafkaTable = "person"; String dropsql = "DROP TABLE IF EXISTS "+kafkaTable; String sql = "CREATE TABLE "+kafkaTable+" (\n" + " user_id String,\n" + " user_name String,\n" + " age Int\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'kfk',\n" + " 'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" + " 'format.type' = 'csv',\n" + " 'update-mode' = 'append'\n" + ")"; tableEnvironment.executeSql(dropsql); tableEnvironment.executeSql(sql); Table table = tableEnvironment.sqlQuery("select * from person"); tableEnvironment.toAppendStream(table , Row.class).print(); env.execute("kafka"); } }
这里会出现一个问题,没有请跳过!
MetaException(message:An exception was thrown while adding/validating class(es) : Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
原因是mysql中hive的元数据库的字符集问题,也正是因为字符集问题,导致了create或者insert或者load等等操作出现了问题!
解决方法:
use metastore; mysql> show variables like '%char%'; +--------------------------------------+----------------------------+ | Variable_name | Value | +--------------------------------------+----------------------------+ | character_set_client | utf8 | | character_set_connection | utf8 | | character_set_database | utf8 | | character_set_filesystem | binary | | character_set_results | utf8 | | character_set_server | utf8 | | character_set_system | utf8 | | character_sets_dir | /usr/share/mysql/charsets/ | | validate_password_special_char_count | 1 | +--------------------------------------+----------------------------+ mysql> alter database metastore character set latin1; Query OK, 1 row affected (0.00 sec) mysql> show variables like '%char%'; +--------------------------------------+----------------------------+ | Variable_name | Value | +--------------------------------------+----------------------------+ | character_set_client | utf8 | | character_set_connection | utf8 | | character_set_database | latin1 | | character_set_filesystem | binary | | character_set_results | utf8 | | character_set_server | utf8 | | character_set_system | utf8 | | character_sets_dir | /usr/share/mysql/charsets/ | | validate_password_special_char_count | 1 | +--------------------------------------+----------------------------+ 9 rows in set (0.01 sec)
所以需要设置编码集为 :latin1,即可解决上面问题。
如果没有报错则会出现这样:原因是还没有产生数据
此时查看hive,或者flink sql client会发现有person这个表
flink sql client:
Flink SQL> show databases; default flink Flink SQL> use flink; Flink SQL> show tables; person Flink SQL> desc person; +-----------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-----------+--------+------+-----+--------+-----------+ | user_id | STRING | true | | | | | user_name | STRING | true | | | | | age | INT | true | | | | +-----------+--------+------+-----+--------+-----------+ 3 rows in set
hive:
hive (default)> use flink; OK Time taken: 1.056 seconds hive (flink)> show tables; OK tab_name person Time taken: 0.179 seconds, Fetched: 1 row(s) hive (flink)> desc person; OK col_name data_type comment Time taken: 0.117 seconds hive (flink)>
(2.4)第四步:测试kafka数据
创建生产者:
bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk
测试数据:
>100,alex,10 >100,alex,10 >100,alex,10 >100,alex,10
运行结果
通过Flink SQL Client查看结果:
bin/sql-client.sh embedded
select * from person;
如果在执行sql语句时会出现这个错误,那么多试几遍
Flink SQL> select * from person; [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer