Flink SQL与JDBC的集成

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

版本说明:

  • flink-1.12.1

第一步:加载依赖与添加jar包

Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

将flink-connector-jdbc_2.11-1.12.1.jar包移到/opt/modules/flink/lib目录下

flink-connector-jdbc_2.11-1.12.1.jar下载地址:


https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.11/1.12.1

第二步:在mysql中创建表

create table person(user_id  varchar(20), user_name  varchar(20), age int);
mysql> desc person;
+-----------+-------------+------+-----+---------+-------+
| Field     | Type        | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+-------+
| user_id   | varchar(20) | YES  |     | NULL    |       |
| user_name | varchar(20) | YES  |     | NULL    |       |
| age       | int(11)     | YES  |     | NULL    |       |
+-----------+-------------+------+-----+---------+-------+

第三步:测试Flink SQL与JDBC集成代码

package com.aikfk.flink.sql.jdbc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaJDBC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        String catalogName = "flink_hive";
        String hiveDataBase = "flink";
        String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
        HiveCatalog hiveCatalog =
                new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);
        tableEnvironment.registerCatalog(catalogName , hiveCatalog);
        tableEnvironment.useCatalog(catalogName);
        String kafkaTable = "kafka_person";
        String kafkaDropsql = "DROP TABLE IF EXISTS kafka_person";
        String kafakTable_sql
                = "CREATE TABLE kafka_person (\n" +
                "    user_id String,\n" +
                "    user_name String,\n" +
                "    age Int\n" +
                ") WITH (\n" +
                "   'connector.type' = 'kafka',\n" +
                "   'connector.version' = 'universal',\n" +
                "   'connector.topic' = 'kfk',\n" +
                "   'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" +
                "   'format.type' = 'csv',\n" +
                "   'update-mode' = 'append'\n" +
                ")";
        tableEnvironment.executeSql(kafkaDropsql);
        tableEnvironment.executeSql(kafakTable_sql);
        // register a MySQL table 'person' in Flink SQL
        String mysqlTable_sql =
                "CREATE TABLE mysql_person (\n" +
                        "  user_id String,\n" +
                        "  user_name String,\n" +
                        "  age INT\n" +
                        ") WITH (\n" +
                        "   'connector' = 'jdbc',\n" +
                        "   'url' = 'jdbc:mysql://bigdata-pro-m07:3306/flink',\n" +
                        "   'table-name' = 'person',\n" +
                        "   'username' = 'root',\n" +
                        "   'password' = '199911'\n" +
                        ")";
        String mysqlDropsql = "DROP TABLE IF EXISTS mysql_person";
        tableEnvironment.executeSql(mysqlDropsql);
        tableEnvironment.executeSql(mysqlTable_sql);
        // write data into the JDBC table from the other table "kafka_person"
        tableEnvironment.executeSql("INSERT INTO mysql_person\n" +
                "SELECT user_id, user_name, age FROM kafka_person");
        env.execute("kafka");
    }
}

通过flink sql client查看kafka_person、mysql_person表:

Flink SQL> show tables;
kafka_person
mysql_person
person
Flink SQL> desc kafka_person;
+-----------+--------+------+-----+--------+-----------+
|      name |   type | null | key | extras | watermark |
+-----------+--------+------+-----+--------+-----------+
|   user_id | STRING | true |     |        |           |
| user_name | STRING | true |     |        |           |
|       age |    INT | true |     |        |           |
+-----------+--------+------+-----+--------+-----------+
3 rows in set

第四步:测试kafka数据源与mysql写入数据

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10

运行结果查看mysql中是否写入数据

mysql> select * from person;
+---------+-----------+------+
| user_id | user_name | age  |
+---------+-----------+------+
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
+---------+-----------+------+
5 rows in set (0.00 sec)

通过Flink SQL Client查看结果:

bin/sql-client.sh embedded
select * from kafka_person;
select * from mysql_person;

14.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
213 15
|
16天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
90 14
|
3月前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
3月前
|
SQL 数据库连接 数据库
管理系统中的Visual Studio与SQL集成技巧与方法
在现代软件开发和管理系统中,Visual Studio(VS)作为强大的集成开发环境(IDE),与SQL数据库的紧密集成是构建高效、可靠应用程序的关键
|
3月前
|
SQL 机器学习/深度学习 数据采集
SQL与Python集成:数据库操作无缝衔接2a.bijius.com
Python与SQL的集成是现代数据科学和工程实践的核心。通过有效的数据查询、管理与自动化,可以显著提升数据分析和决策过程的效率与准确性。随着技术的不断发展,这种集成的应用场景将更加广泛,为数据驱动的创新提供更强大的支持。
|
3月前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
1. Python与SQL集成的关键步骤 在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
62 0
|
3月前
|
SQL 监控 数据库
管理系统VS SQL:高效集成的关键技巧与方法
在现代企业信息化建设中,管理系统(如ERP、CRM等)与SQL数据库之间的紧密集成是确保数据流动顺畅、业务逻辑高效执行的关键
|
3月前
|
SQL 云安全 监控
通过 Python 和 SQL 集成加强云环境
通过 Python 和 SQL 集成加强云环境
38 0
|
3月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用