Flink SQL与Kafka的集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 笔记

版本说明:

  • flink-1.12.1
  • kafka_2.12-2.4.1


(1)Flink Stream与Kafka的集成


添加maven依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.12.1</version>
</dependency>

测试Kafka Consumer:Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。

package com.aikfk.flink.sql.kafka;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafka {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // Kafka Consumer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","bigdata-pro-m07:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.setProperty("group.id","kfk");
        // kafka数据源
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>
                ("kfk", new SimpleStringSchema(), properties));
        dataStream.map(new MapFunction<String, Tuple2<String,String>>() {
            @Override
            public Tuple2<String,String> map(String line) throws Exception {
                String[] words = line.split(",");
                return new Tuple2<>(words[0],words[1]);
            }
        }).print();
        env.execute("kafka");
    }
}

创建topic:

bin/kafka-topics.sh --bootstrap-server bigdata-pro-m07:9092 --create --topic kfk --partitions 1 --replication-factor 1

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

1> (hibve,dsd)

关于flink stream与kafka的更多集成见官方文档:


https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html


(2)Flink SQL与Kafka的集成


在/opt/modules/flink/lib目录下添加jar包

flink-sql-connector-kafka_2.11-1.12.1.jar


下载地址:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.12.1


(2.1)第一步:Flink SQL Client配置好

见文章:https://blog.csdn.net/weixin_45366499/article/details/115576853


(2.2)第二步:创建hiveConfDir

将hive-site.xml放在/Users/caizhengjie/Desktop/hive-conf目录下


(2.3)第三步:测试Flink SQL与Kafka集成的代码

package com.aikfk.flink.sql.kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafkaDDL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        String catalogName = "flink_hive";
        String hiveDataBase = "flink";
        String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
        // Catalog
        HiveCatalog hiveCatalog =
                new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);
        tableEnvironment.registerCatalog(catalogName , hiveCatalog);
        tableEnvironment.useCatalog(catalogName);
        // DDL,根据kafka数据源创建表
        String kafkaTable = "person";
        String dropsql = "DROP TABLE IF EXISTS "+kafkaTable;
        String sql
                = "CREATE TABLE "+kafkaTable+" (\n" +
                "    user_id String,\n" +
                "    user_name String,\n" +
                "    age Int\n" +
                ") WITH (\n" +
                "   'connector.type' = 'kafka',\n" +
                "   'connector.version' = 'universal',\n" +
                "   'connector.topic' = 'kfk',\n" +
                "   'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" +
                "   'format.type' = 'csv',\n" +
                "   'update-mode' = 'append'\n" +
                ")";
        tableEnvironment.executeSql(dropsql);
        tableEnvironment.executeSql(sql);
        Table table = tableEnvironment.sqlQuery("select * from person");
        tableEnvironment.toAppendStream(table , Row.class).print();
        env.execute("kafka");
    }
}

这里会出现一个问题,没有请跳过!

MetaException(message:An exception was thrown while adding/validating class(es) : Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead

原因是mysql中hive的元数据库的字符集问题,也正是因为字符集问题,导致了create或者insert或者load等等操作出现了问题!

解决方法:

use metastore;
mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | utf8                       |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+
mysql> alter database metastore character set latin1;
Query OK, 1 row affected (0.00 sec)
mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | latin1                     |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+
9 rows in set (0.01 sec)

所以需要设置编码集为 :latin1,即可解决上面问题。

如果没有报错则会出现这样:原因是还没有产生数据

10.png


此时查看hive,或者flink sql client会发现有person这个表

flink sql client:

Flink SQL> show databases;
default
flink
Flink SQL> use flink;
Flink SQL> show tables;
person
Flink SQL> desc person;
+-----------+--------+------+-----+--------+-----------+
|      name |   type | null | key | extras | watermark |
+-----------+--------+------+-----+--------+-----------+
|   user_id | STRING | true |     |        |           |
| user_name | STRING | true |     |        |           |
|       age |    INT | true |     |        |           |
+-----------+--------+------+-----+--------+-----------+
3 rows in set

hive:

hive (default)> use flink;
OK
Time taken: 1.056 seconds
hive (flink)> show tables;
OK
tab_name
person
Time taken: 0.179 seconds, Fetched: 1 row(s)
hive (flink)> desc person;
OK
col_name  data_type comment
Time taken: 0.117 seconds
hive (flink)> 

(2.4)第四步:测试kafka数据

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10

运行结果

11.png通过Flink SQL Client查看结果:

bin/sql-client.sh embedded
select * from person;

13.png

如果在执行sql语句时会出现这个错误,那么多试几遍

Flink SQL> select * from person;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
198 0
|
8天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
70 14
|
2月前
|
SQL 数据库连接 数据库
管理系统中的Visual Studio与SQL集成技巧与方法
在现代软件开发和管理系统中,Visual Studio(VS)作为强大的集成开发环境(IDE),与SQL数据库的紧密集成是构建高效、可靠应用程序的关键
|
2月前
|
SQL 机器学习/深度学习 数据采集
SQL与Python集成:数据库操作无缝衔接2a.bijius.com
Python与SQL的集成是现代数据科学和工程实践的核心。通过有效的数据查询、管理与自动化,可以显著提升数据分析和决策过程的效率与准确性。随着技术的不断发展,这种集成的应用场景将更加广泛,为数据驱动的创新提供更强大的支持。
|
2月前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
1. Python与SQL集成的关键步骤 在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
60 0
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
197 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
51 0
|
SQL 消息中间件 Kafka
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")