Flink SQL与Kafka的集成

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 笔记

版本说明:

  • flink-1.12.1
  • kafka_2.12-2.4.1


(1)Flink Stream与Kafka的集成


添加maven依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.12.1</version>
</dependency>

测试Kafka Consumer:Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。

package com.aikfk.flink.sql.kafka;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafka {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // Kafka Consumer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","bigdata-pro-m07:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.setProperty("group.id","kfk");
        // kafka数据源
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>
                ("kfk", new SimpleStringSchema(), properties));
        dataStream.map(new MapFunction<String, Tuple2<String,String>>() {
            @Override
            public Tuple2<String,String> map(String line) throws Exception {
                String[] words = line.split(",");
                return new Tuple2<>(words[0],words[1]);
            }
        }).print();
        env.execute("kafka");
    }
}

创建topic:

bin/kafka-topics.sh --bootstrap-server bigdata-pro-m07:9092 --create --topic kfk --partitions 1 --replication-factor 1

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

1> (hibve,dsd)

关于flink stream与kafka的更多集成见官方文档:


https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html


(2)Flink SQL与Kafka的集成


在/opt/modules/flink/lib目录下添加jar包

flink-sql-connector-kafka_2.11-1.12.1.jar


下载地址:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.12.1


(2.1)第一步:Flink SQL Client配置好

见文章:https://blog.csdn.net/weixin_45366499/article/details/115576853


(2.2)第二步:创建hiveConfDir

将hive-site.xml放在/Users/caizhengjie/Desktop/hive-conf目录下


(2.3)第三步:测试Flink SQL与Kafka集成的代码

package com.aikfk.flink.sql.kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafkaDDL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        String catalogName = "flink_hive";
        String hiveDataBase = "flink";
        String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
        // Catalog
        HiveCatalog hiveCatalog =
                new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);
        tableEnvironment.registerCatalog(catalogName , hiveCatalog);
        tableEnvironment.useCatalog(catalogName);
        // DDL,根据kafka数据源创建表
        String kafkaTable = "person";
        String dropsql = "DROP TABLE IF EXISTS "+kafkaTable;
        String sql
                = "CREATE TABLE "+kafkaTable+" (\n" +
                "    user_id String,\n" +
                "    user_name String,\n" +
                "    age Int\n" +
                ") WITH (\n" +
                "   'connector.type' = 'kafka',\n" +
                "   'connector.version' = 'universal',\n" +
                "   'connector.topic' = 'kfk',\n" +
                "   'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" +
                "   'format.type' = 'csv',\n" +
                "   'update-mode' = 'append'\n" +
                ")";
        tableEnvironment.executeSql(dropsql);
        tableEnvironment.executeSql(sql);
        Table table = tableEnvironment.sqlQuery("select * from person");
        tableEnvironment.toAppendStream(table , Row.class).print();
        env.execute("kafka");
    }
}

这里会出现一个问题,没有请跳过!

MetaException(message:An exception was thrown while adding/validating class(es) : Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead

原因是mysql中hive的元数据库的字符集问题,也正是因为字符集问题,导致了create或者insert或者load等等操作出现了问题!

解决方法:

use metastore;
mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | utf8                       |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+
mysql> alter database metastore character set latin1;
Query OK, 1 row affected (0.00 sec)
mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | latin1                     |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+
9 rows in set (0.01 sec)

所以需要设置编码集为 :latin1,即可解决上面问题。

如果没有报错则会出现这样:原因是还没有产生数据

10.png


此时查看hive,或者flink sql client会发现有person这个表

flink sql client:

Flink SQL> show databases;
default
flink
Flink SQL> use flink;
Flink SQL> show tables;
person
Flink SQL> desc person;
+-----------+--------+------+-----+--------+-----------+
|      name |   type | null | key | extras | watermark |
+-----------+--------+------+-----+--------+-----------+
|   user_id | STRING | true |     |        |           |
| user_name | STRING | true |     |        |           |
|       age |    INT | true |     |        |           |
+-----------+--------+------+-----+--------+-----------+
3 rows in set

hive:

hive (default)> use flink;
OK
Time taken: 1.056 seconds
hive (flink)> show tables;
OK
tab_name
person
Time taken: 0.179 seconds, Fetched: 1 row(s)
hive (flink)> desc person;
OK
col_name  data_type comment
Time taken: 0.117 seconds
hive (flink)> 

(2.4)第四步:测试kafka数据

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10

运行结果

11.png通过Flink SQL Client查看结果:

bin/sql-client.sh embedded
select * from person;

13.png

如果在执行sql语句时会出现这个错误,那么多试几遍

Flink SQL> select * from person;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
192 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
271 0
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
82 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
67 1
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
84 1
|
3月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
49 7
|
3月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
87 4
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
243 0
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
68 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")