Flink SQL与JDBC的集成

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

版本说明:

  • flink-1.12.1

第一步:加载依赖与添加jar包

Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

将flink-connector-jdbc_2.11-1.12.1.jar包移到/opt/modules/flink/lib目录下

flink-connector-jdbc_2.11-1.12.1.jar下载地址:


https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.11/1.12.1

第二步:在mysql中创建表

create table person(user_id  varchar(20), user_name  varchar(20), age int);
mysql> desc person;
+-----------+-------------+------+-----+---------+-------+
| Field     | Type        | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+-------+
| user_id   | varchar(20) | YES  |     | NULL    |       |
| user_name | varchar(20) | YES  |     | NULL    |       |
| age       | int(11)     | YES  |     | NULL    |       |
+-----------+-------------+------+-----+---------+-------+

第三步:测试Flink SQL与JDBC集成代码

package com.aikfk.flink.sql.jdbc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaJDBC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        String catalogName = "flink_hive";
        String hiveDataBase = "flink";
        String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
        HiveCatalog hiveCatalog =
                new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);
        tableEnvironment.registerCatalog(catalogName , hiveCatalog);
        tableEnvironment.useCatalog(catalogName);
        String kafkaTable = "kafka_person";
        String kafkaDropsql = "DROP TABLE IF EXISTS kafka_person";
        String kafakTable_sql
                = "CREATE TABLE kafka_person (\n" +
                "    user_id String,\n" +
                "    user_name String,\n" +
                "    age Int\n" +
                ") WITH (\n" +
                "   'connector.type' = 'kafka',\n" +
                "   'connector.version' = 'universal',\n" +
                "   'connector.topic' = 'kfk',\n" +
                "   'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" +
                "   'format.type' = 'csv',\n" +
                "   'update-mode' = 'append'\n" +
                ")";
        tableEnvironment.executeSql(kafkaDropsql);
        tableEnvironment.executeSql(kafakTable_sql);
        // register a MySQL table 'person' in Flink SQL
        String mysqlTable_sql =
                "CREATE TABLE mysql_person (\n" +
                        "  user_id String,\n" +
                        "  user_name String,\n" +
                        "  age INT\n" +
                        ") WITH (\n" +
                        "   'connector' = 'jdbc',\n" +
                        "   'url' = 'jdbc:mysql://bigdata-pro-m07:3306/flink',\n" +
                        "   'table-name' = 'person',\n" +
                        "   'username' = 'root',\n" +
                        "   'password' = '199911'\n" +
                        ")";
        String mysqlDropsql = "DROP TABLE IF EXISTS mysql_person";
        tableEnvironment.executeSql(mysqlDropsql);
        tableEnvironment.executeSql(mysqlTable_sql);
        // write data into the JDBC table from the other table "kafka_person"
        tableEnvironment.executeSql("INSERT INTO mysql_person\n" +
                "SELECT user_id, user_name, age FROM kafka_person");
        env.execute("kafka");
    }
}

通过flink sql client查看kafka_person、mysql_person表:

Flink SQL> show tables;
kafka_person
mysql_person
person
Flink SQL> desc kafka_person;
+-----------+--------+------+-----+--------+-----------+
|      name |   type | null | key | extras | watermark |
+-----------+--------+------+-----+--------+-----------+
|   user_id | STRING | true |     |        |           |
| user_name | STRING | true |     |        |           |
|       age |    INT | true |     |        |           |
+-----------+--------+------+-----+--------+-----------+
3 rows in set

第四步:测试kafka数据源与mysql写入数据

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk

测试数据:

>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10

运行结果查看mysql中是否写入数据

mysql> select * from person;
+---------+-----------+------+
| user_id | user_name | age  |
+---------+-----------+------+
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
| 100     | alex      |   10 |
+---------+-----------+------+
5 rows in set (0.00 sec)

通过Flink SQL Client查看结果:

bin/sql-client.sh embedded
select * from kafka_person;
select * from mysql_person;

14.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
193 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
29天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
117 14
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
84 1
|
4月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
755 2
Flink CDC:新一代实时数据集成框架
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
67 0
|
3月前
|
Java 关系型数据库 MySQL
mysql5.7 jdbc驱动
遵循上述步骤,即可在Java项目中高效地集成MySQL 5.7 JDBC驱动,实现数据库的访问与管理。
676 1
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
146 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
62 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
80 0
|
5月前
|
SQL druid Java
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
72 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)

热门文章

最新文章