Flink SQL与JDBC的集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 笔记

版本说明:

  • flink-1.12.1

第一步:加载依赖与添加jar包

Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

将flink-connector-jdbc_2.11-1.12.1.jar包移到/opt/modules/flink/lib目录下

flink-connector-jdbc_2.11-1.12.1.jar下载地址:


https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.11/1.12.1

第二步:在mysql中创建表

create table person(user_id  varchar(20), user_name  varchar(20), age int);
mysql> desc person;
+-----------+-------------+------+-----+---------+-------+
| Field     | Type        | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+-------+
| user_id   | varchar(20) | YES  |     | NULL    |       |
| user_name | varchar(20) | YES  |     | NULL    |       |
| age       | int(11)     | YES  |     | NULL    |       |
+-----------+-------------+------+-----+---------+-------+

第三步:测试Flink SQL与JDBC集成代码

package com.aikfk.flink.sql.jdbc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaJDBC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        String catalogName = "flink_hive";
        String hiveDataBase = "flink";
        String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
        HiveCatalog hiveCatalog =
                new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);
        tableEnvironment.registerCatalog(catalogName , hiveCatalog);
        tableEnvironment.useCatalog(catalogName);
        String kafkaTable = "kafka_person";
        String kafkaDropsql = "DROP TABLE IF EXISTS kafka_person";
        String kafakTable_sql
                = "CREATE TABLE kafka_person (\n" +
                "    user_id String,\n" +
                "    user_name String,\n" +
                "    age Int\n" +
                ") WITH (\n" +
                "   'connector.type' = 'kafka',\n" +
                "   'connector.version' = 'universal',\n" +
                "   'connector.topic' = 'kfk',\n" +
                "   'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" +
                "   'format.type' = 'csv',\n" +
                "   'update-mode' = 'append'\n" +
                ")";
        tableEnvironment.executeSql(kafkaDropsql);
        tableEnvironment.executeSql(kafakTable_sql);
        // register a MySQL table 'person' in Flink SQL
        String mysqlTable_sql =
                "CREATE TABLE mysql_person (\n" +
                        "  user_id String,\n" +
                        "  user_name String,\n" +
                        "  age INT\n" +
                        ") WITH (\n" +
                        "   'connector' = 'jdbc',\n" +
                        "   'url' = 'jdbc:mysql://bigdata-pro-m07:3306/flink',\n" +
                        "   'table-name' = 'person',\n" +
                        "   'username' = 'root',\n" +
                        "   'password' = '199911'\n" +
                        ")";
        String mysqlDropsql = "DROP TABLE IF EXISTS mysql_person";
        tableEnvironment.executeSql(mysqlDropsql);
        tableEnvironment.executeSql(mysqlTable_sql);
        // write data into the JDBC table from the other table "kafka_person"
        tableEnvironment.executeSql("INSERT INTO mysql_person\n" +
                "SELECT user_id, user_name, age FROM kafka_person");
        env.execute("kafka");
    }
}

通过flink sql client查看kafka_person、mysql_person表:

Flink SQL> show tables;
kafka_person
mysql_person
person
Flink SQL> desc kafka_person;
+-----------+--------+------+-----+--------+-----------+
|      name |   type | null | key | extras | watermark |
+-----------+--------+------+-----+--------+-----------+
|   user_id | STRING | true |     |        |           |
| user_name | STRING | true |     |        |           |
|       age |    INT | true |     |        |           |
+-----------+--------+------+-----+--------+-----------+
3 rows in set

第四步:测试kafka数据源与mysql写入数据

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10

运行结果查看mysql中是否写入数据

mysql> select * from person;
+---------+-----------+------+
| user_id | user_name | age  |
+---------+-----------+------+
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
+---------+-----------+------+
5 rows in set (0.00 sec)

通过Flink SQL Client查看结果:

bin/sql-client.sh embedded
select * from kafka_person;
select * from mysql_person;

14.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
Java 数据库连接 数据库
Flink Connector JDBC已经被移到了一个独立的仓库
【2月更文挑战第23天】Flink Connector JDBC已经被移到了一个独立的仓库
14 1
|
2月前
|
SQL 关系型数据库 MySQL
Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
【2月更文挑战第18天】Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
174 2
|
2月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
211 2
|
2月前
|
SQL 分布式计算 HIVE
基于 Kyuubi 实现分布式 Flink SQL 网关
本文整理自网易互娱资深开发工程师、Apache Kyuubi Committer 林小铂的《基于 Kyuubi 实现分布式 Flink SQL 网关》分享。
104471 64
基于 Kyuubi 实现分布式 Flink SQL 网关
|
3月前
|
SQL 数据采集 JSON
弱结构化日志 Flink SQL 怎么写?SLS SPL 来帮忙
弱结构化日志 Flink SQL 怎么写?SLS SPL 来帮忙
125206 136
|
3月前
|
SQL 监控 API
Flink SQL支持写判断语句
【2月更文挑战第8天】Flink SQL支持写判断语句
239 12
|
13天前
|
前端开发 Java 应用服务中间件
从零手写实现 tomcat-08-tomcat 如何与 springboot 集成?
该文是一系列关于从零开始手写实现 Apache Tomcat 的教程概述。作者希望通过亲自动手实践理解 Tomcat 的核心机制。文章讨论了 Spring Boot 如何实现直接通过 `main` 方法启动,Spring 与 Tomcat 容器的集成方式,以及两者生命周期的同步原理。文中还提出了实现 Tomcat 的启发,强调在设计启动流程时确保资源的正确加载和初始化。最后提到了一个名为 mini-cat(嗅虎)的简易 Tomcat 实现项目,开源于 [GitHub](https://github.com/houbb/minicat)。
|
1月前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
1月前
|
NoSQL Java Redis
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
373 0