Flink SQL与Kafka的集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 笔记

版本说明:

  • flink-1.12.1
  • kafka_2.12-2.4.1


(1)Flink Stream与Kafka的集成


添加maven依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.12.1</version>
</dependency>

测试Kafka Consumer:Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。

package com.aikfk.flink.sql.kafka;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafka {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // Kafka Consumer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","bigdata-pro-m07:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.setProperty("group.id","kfk");
        // kafka数据源
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>
                ("kfk", new SimpleStringSchema(), properties));
        dataStream.map(new MapFunction<String, Tuple2<String,String>>() {
            @Override
            public Tuple2<String,String> map(String line) throws Exception {
                String[] words = line.split(",");
                return new Tuple2<>(words[0],words[1]);
            }
        }).print();
        env.execute("kafka");
    }
}

创建topic:

bin/kafka-topics.sh --bootstrap-server bigdata-pro-m07:9092 --create --topic kfk --partitions 1 --replication-factor 1

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

1> (hibve,dsd)

关于flink stream与kafka的更多集成见官方文档:


https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html


(2)Flink SQL与Kafka的集成


在/opt/modules/flink/lib目录下添加jar包

flink-sql-connector-kafka_2.11-1.12.1.jar


下载地址:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.12.1


(2.1)第一步:Flink SQL Client配置好

见文章:https://blog.csdn.net/weixin_45366499/article/details/115576853


(2.2)第二步:创建hiveConfDir

将hive-site.xml放在/Users/caizhengjie/Desktop/hive-conf目录下


(2.3)第三步:测试Flink SQL与Kafka集成的代码

package com.aikfk.flink.sql.kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafkaDDL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        String catalogName = "flink_hive";
        String hiveDataBase = "flink";
        String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
        // Catalog
        HiveCatalog hiveCatalog =
                new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);
        tableEnvironment.registerCatalog(catalogName , hiveCatalog);
        tableEnvironment.useCatalog(catalogName);
        // DDL,根据kafka数据源创建表
        String kafkaTable = "person";
        String dropsql = "DROP TABLE IF EXISTS "+kafkaTable;
        String sql
                = "CREATE TABLE "+kafkaTable+" (\n" +
                "    user_id String,\n" +
                "    user_name String,\n" +
                "    age Int\n" +
                ") WITH (\n" +
                "   'connector.type' = 'kafka',\n" +
                "   'connector.version' = 'universal',\n" +
                "   'connector.topic' = 'kfk',\n" +
                "   'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" +
                "   'format.type' = 'csv',\n" +
                "   'update-mode' = 'append'\n" +
                ")";
        tableEnvironment.executeSql(dropsql);
        tableEnvironment.executeSql(sql);
        Table table = tableEnvironment.sqlQuery("select * from person");
        tableEnvironment.toAppendStream(table , Row.class).print();
        env.execute("kafka");
    }
}

这里会出现一个问题,没有请跳过!

MetaException(message:An exception was thrown while adding/validating class(es) : Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead

原因是mysql中hive的元数据库的字符集问题,也正是因为字符集问题,导致了create或者insert或者load等等操作出现了问题!

解决方法:

use metastore;
mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | utf8                       |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+
mysql> alter database metastore character set latin1;
Query OK, 1 row affected (0.00 sec)
mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | latin1                     |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+
9 rows in set (0.01 sec)

所以需要设置编码集为 :latin1,即可解决上面问题。

如果没有报错则会出现这样:原因是还没有产生数据

10.png


此时查看hive,或者flink sql client会发现有person这个表

flink sql client:

Flink SQL> show databases;
default
flink
Flink SQL> use flink;
Flink SQL> show tables;
person
Flink SQL> desc person;
+-----------+--------+------+-----+--------+-----------+
|      name |   type | null | key | extras | watermark |
+-----------+--------+------+-----+--------+-----------+
|   user_id | STRING | true |     |        |           |
| user_name | STRING | true |     |        |           |
|       age |    INT | true |     |        |           |
+-----------+--------+------+-----+--------+-----------+
3 rows in set

hive:

hive (default)> use flink;
OK
Time taken: 1.056 seconds
hive (flink)> show tables;
OK
tab_name
person
Time taken: 0.179 seconds, Fetched: 1 row(s)
hive (flink)> desc person;
OK
col_name  data_type comment
Time taken: 0.117 seconds
hive (flink)> 

(2.4)第四步:测试kafka数据

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10

运行结果

11.png通过Flink SQL Client查看结果:

bin/sql-client.sh embedded
select * from person;

13.png

如果在执行sql语句时会出现这个错误,那么多试几遍

Flink SQL> select * from person;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
相关文章
|
1月前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
2月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
SQL 关系型数据库 MySQL
Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
【2月更文挑战第18天】Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
174 2
|
2月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
212 2
|
2月前
|
SQL 分布式计算 HIVE
基于 Kyuubi 实现分布式 Flink SQL 网关
本文整理自网易互娱资深开发工程师、Apache Kyuubi Committer 林小铂的《基于 Kyuubi 实现分布式 Flink SQL 网关》分享。
104472 64
基于 Kyuubi 实现分布式 Flink SQL 网关
|
3月前
|
消息中间件 存储 物联网
|
3月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
484 2
2024年了,如何更好的搭建Kafka集群?
|
4月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
49 0
|
7月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
266 0
|
4月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
72 0

热门文章

最新文章