5 分钟上手 Flink MySQL 连接器实验手册|Flink-Learning 实战营

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 加入 Flink-Learning 实战营,动手体验真实有趣的实战场景。只需 2 小时,让您变身 Flink 实战派。实战营采取了 Flink 专家在线授课,专属社群答疑,小松鼠助教全程陪伴的学习模式。

作者|于喜千(千浪)

实验简介

MySQL Connector 可以将本地或远程的 MySQL 数据库连接到 Flink 中,并方便地使用 Flink Table API 与之交互、捕获数据变更、并将处理结果写回数据库。

本场景主要介绍如何在阿里云实时计算平台上使用 Flink MySQL 连接器的相关功能,并使用 Table API 编写一个简单的例子,尝试 MySQL 作为源表、维表、汇表的不同功能。

实验资源

实验所开通的云产品因数据连通性要求,需使用同一 Region 可用区,建议都选取北京 Region 的同一可用区。涉及的云产品包括阿里云实时计算 Flink 版、阿里云数据库 RDS。

本场景使用到的实验资源和配置如下:

  • 资源一:阿里云实时计算 Flink 版
配置项 规格
Task Manger 个数 4 个
Task Manager CPU 2 核心
Task Manager Memory 8 GiB
Job Manager CPU 1 核
Job Manager Memory 2 GiB
  • 资源二:阿里云数据库 RDS
配置项 规格
CPU 2 核心
内存 4 GiB
最大连接数 1200
最大 IOPS 2000

体验目标

本场景将以 阿里云实时计算Flink版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例,并以一个实时商品销售数据统计的例子尝试上手 Connector 的数据捕获、数据写入等功能。

按步骤完成本次实验后,您将掌握的知识有:

  • 使用 Flink 实时计算平台创建并提交作业的方法;
  • 编写基于 Flink Table API SQL 语句的能力;
  • 使用 MySQL Connector 对数据库进行读写的方法。

背景知识

本场景主要涉及以下云产品和服务:

阿里云实时计算 Flink 版是一种全托管 Serverless 的 Flink 云服务,开箱即用,计费灵活。具备一站式开发运维管理平台,支持作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。100% 兼容 Apache Flink,支持开源 Flink 平滑迁移上云,核心企业级增强 Flink 引擎较开源 Flink 有约两倍性能的提升。拥有 Flink CDC、企业级复杂事件处理(CEP)等企业级增值功能,并内置丰富上下游连接器,助力企业构建高效、稳定和强大的实时数据应用。

云数据库 RDS(Relational Database Service,简称 RDS)是一种稳定可靠、可弹性伸缩的在线数据库服务。基于阿里云分布式文件系统和 SSD 盘高性能存储,RDS 支持 MySQL、SQL Server、PostgreSQL、PPAS 和 MariaDB 引擎,提供了容灾、备份、恢复、监控、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。

前置知识

  • 了解 MySQL 数据库相关的基础知识,能够阅读编写简单的 SQL 语句。

步骤一:创建资源

开始实验之前,您需要先创建相关实验资源:阿里云实时计算 Flink 版和云数据库 RDS。

步骤二:创建数据库表

在这个例子中,我们将创建三张数据表,分别作为源表、维表、汇表,演示 MySQL Connector 的不同功能。

  1. 进入云数据库 RDS 后台,并登录刚刚创建资源的后台页面。
  2. 点击左侧边栏的 + 加号按钮,创建一个测试用数据库,然后在右侧命令区输入以下建表指令并执行:
-- Source Table;
CREATE TABLE `source_table` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `good_id` int DEFAULT NULL,
  `amount` int DEFAULT NULL,
  `record_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
);

-- Dimension Table;
CREATE TABLE `dimension_table` (
  `good_id` int unsigned NOT NULL,
  `good_name` varchar(256) DEFAULT NULL,
  `good_price` int DEFAULT NULL,
  PRIMARY KEY (`good_id`)
);

-- Sink Table;
CREATE TABLE `sink_table` (
  `record_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `good_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `sell_amount` int DEFAULT NULL,
  PRIMARY KEY (`record_timestamp`)
);

步骤三:创建 Flink 作业

1.进入实时计算 Flink 平台,点击左侧边栏中的「应用」—「作业开发」菜单,并点击顶部工具栏的「新建」按钮新建一个作业。作业名字任意,类型选择「流作业 / SQL」,其余设置保持默认。如下所示:

1

2.成功创建作业后,右侧编辑窗格应该显示新作业的内容:

2

3.接下来,我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 MySQL CDC 连接器实时捕获 source_table的变化:

CREATE TEMPORARY TABLE source_table (
    id INT NOT NULL PRIMARY KEY NOT ENFORCED,
    record_time TIMESTAMP_LTZ(3),
    good_id INT,
    amount INT,
    WATERMARK FOR record_time AS record_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'source_table'
);

需要将 hostname参数替换为早些时候创建资源的域名、将 usernamepassword参数替换为数据库登录用户名及密码、将 database-name参数替换为之前在 RDS 后台中创建的数据库名称。

其中,'connector' = 'mysql-cdc'指定了使用 MySQL CDC 连接器来捕获变化数据。您需要使用准备步骤中申请的 RDS MySQL URL、用户名、密码,以及之前创建的数据库名替换对应部分。

任何时候您都可以点击顶部工具栏中的「验证」按钮,来确认作业 Flink SQL 语句中是否存在语法错误。

4.为了测试是否成功地捕获了源表数据,紧接着在下面写一行 SELECT * FROM source_table;语句,并点击工具栏中的「执行」按钮。接着,向 source_table表中插入一些数据。如果控制台中打印了相应的数据行,则说明捕获成功,如下图所示:

3

5.接下来,我们希望对原始数据按照时间窗口进行分组计算。我们使用 TUMBLE相关窗口函数结合 GROUP BY,将长度 15 秒内的订单数据按照商品 ID 进行归类,并使用 SUM计算其销售总额。我们在 Flink 作业编辑窗格中输入如下代码:

SELECT 
  good_id, 
  tumble_start(
    record_time, interval '15' seconds
  ) AS record_timestamp, 
  sum(amount) AS total_amount 
FROM 
  source_table 
GROUP BY 
  tumble (
    record_time, interval '15' seconds
  ), 
  good_id;

为了测试这一效果,需要向数据库中插入多条数据。你可以在 RDS 中执行附件中的「示例数据.sql」来插入数据,或者使用「示例数据生成.py」脚本实时地插入数据。

在保证源表中有数据的情况下,再次执行 Flink 作业,观察控制台的输出结果:

4

6.在这个业务场景中,购买商品信息使用 good_id记录,而商品 ID 到可读商品名字的映射表、每件商品的价格等信息则存储在另一张维度表(Dimension Table)中。我们同样可以使用 Flink SQL 连接维度表,只需在 Flink 作业中编写下面的语句:

CREATE TEMPORARY TABLE dimension_table (
    good_id INT NOT NULL PRIMARY KEY NOT ENFORCED,
    good_name VARCHAR(256),
    good_price INT
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'dimension_table'
);

这里,我们希望根据上一步中统计出的「每 15 秒商品销售量」信息,计算出每件商品的销售额。由于商品名称及商品价格数据存储在另一张维度表 dimension_table中,我们需要将结果视图和 dimension_table进行 JOIN 操作,并将「商品销售量」、「商品价格」相乘计算出「商品销售额」,并提取结果中的商品可读名称信息作为结果表。

需要确保 dimension_table中存在对应商品 ID 的条目。你可以在 RDS 中执行附件中的「示例数据.sql」来插入数据。

作业代码如下:

SELECT 
  record_timestamp, 
  good_name, 
  total_amount * good_price AS revenue 
FROM 
  (
    SELECT 
      good_id, 
      tumble_start(
        record_time, interval '15' seconds
      ) AS record_timestamp, 
      sum(amount) AS total_amount 
    FROM 
      source_table 
    GROUP BY 
      tumble (
        record_time, interval '15' seconds
      ), 
      good_id
  ) AS tumbled_table 
  LEFT JOIN dimension_table ON tumbled_table.good_id = dimension_table.good_id;

其中第 7 到 20 行和上一步骤的 SQL 语句一致。

执行上面的语句,并观察控制台中的统计数据:

5

7.最后,我们将这些实时的统计数据写回数据库,Flink SQL 也可以简单地实现这一点。首先我们需要创建一张用于连接汇表的 Flink 临时表,如下所示:

CREATE TEMPORARY TABLE sink_table (
    record_timestamp TIMESTAMP(3) NOT NULL PRIMARY KEY NOT ENFORCED,
    good_name VARCHAR(128),
    sell_amount INT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://******************.mysql.rds.aliyuncs.com:3306/***********',
  'table-name' = 'sink_table',
  'username' = '***********',
  'password' = '***********',
  'scan.auto-commit' = 'true'
);

然后,只需要将上面的 SELECT 语句的输出结果 INSERT 到该表就可以了:

INSERT INTO sink_table 
SELECT 
  record_timestamp, 
  -- ... 和上面的语句一样

现在,点击控制台上的「上线」按钮,即可将我们编写的 Flink SQL 作业部署上线执行。您可以使用数据库客户端等软件观察汇表中是否写入了正确的数据。

阿里云实时计算控制台在使用「执行」功能调试时,不会写入任何数据到下游中。因此为了测试使用 SQL Connector 写入汇表,您必须使用「上线」功能。

6

您也可以进入 Flink UI 控制台观察流数据处理图。在这个简单的示例中,首先进行的是源表数据的捕获与窗口聚合;接着和维度表进行 JOIN 操作得到运算结果;最后将处理数据存入汇表。

实验附件

以上就是本实验的全部步骤。完整的 Flink SQL 语句如下:

示例 Flink 作业

为了向云数据库 RDS 中填充示例数据,您可以在数据库后台执行下面的 SQL 语句:

示例数据

或者,您也可以执行下面的 Python 脚本实时向数据库填充数据(需要安装 mysql-connector依赖):

示例数据生成


实战营涉及的其他产品简介

可观测监控 Prometheus 版作为兼容可观测事实标准 – Prometheus 开源项
目的全托管服务。默认集成 Grafana 看板与智能告警功能。一键观测主流云
服务、自建组件/集群,覆盖业务监控/应用层监控/中间件监控/系统层监
控。全面优化探针性能与系统可用性,用户无需关注系统可用性与 Exporter
自研集成。帮助企业快速搭建一站式指标可观测体系。

负载均衡SLB是云原生时代应用高可用的基本要素,是阿里云官方云原生网关。SLB支持对4层、7层业务流量转发处理,通过将流量分发到不同的后端服务来扩展应用系统的服务吞吐能力,通过健康检查和故障自动隔离机制来消除单点故障并提升应用系统的可用性。SLB提供全托管式在线负载均衡服务,具有即开即用、超大容量、稳定可靠、弹性伸缩、按需付费等特点,适合大规模、高并发、高可用场景。


加入 Flink-Learning 实战营,动手体验真实有趣的实战场景。只需 2 小时,让您变身 Flink 实战派。实战营采取了 Flink 专家在线授课,专属社群答疑,助教全程陪伴的学习模式。

入营立享权益


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
SQL Java 关系型数据库
Java连接MySQL数据库环境设置指南
请注意,在实际部署时应该避免将敏感信息(如用户名和密码)硬编码在源码文件里面;应该使用配置文件或者环境变量等更为安全可靠地方式管理这些信息。此外,在处理大量数据时考虑使用PreparedStatement而不是Statement可以提高性能并防止SQL注入攻击;同时也要注意正确处理异常情况,并且确保所有打开过得资源都被正确关闭释放掉以防止内存泄漏等问题发生。
76 13
|
1月前
|
SQL 关系型数据库 MySQL
MySQL数据库连接过多(Too many connections)错误处理策略
综上所述,“Too many connections”错误处理策略涉及从具体参数配置到代码层面再到系统与架构设计全方位考量与改进。每项措施都需根据具体环境进行定制化调整,并且在执行任何变更前建议先行测试评估可能带来影响。
515 11
|
28天前
|
SQL 关系型数据库 MySQL
排除通过IP访问MySQL时出现的连接错误问题
以上步骤涵盖了大多数遇到远程连接 MySQL 数据库时出现故障情形下所需采取措施,在执行每个步骤后都应该重新尝试建立链接以验证是否已经解决问题,在多数情形下按照以上顺序执行将能够有效地排除并修复大多数基本链接相关故障。
174 3
|
1月前
|
SQL 监控 关系型数据库
查寻MySQL或SQL Server的连接数,并配置超时时间和最大连接量
以上步骤提供了直观、实用且易于理解且执行的指导方针来监管和优化数据库服务器配置。务必记得,在做任何重要变更前备份相关配置文件,并确保理解每个参数对系统性能可能产生影响后再做出调节。
228 11
|
2月前
|
存储 关系型数据库 MySQL
修复.net Framework4.x连接MYSQL时遇到utf8mb3字符集不支持错误方案。
通过上述步骤大多数情况下能够解决由于UTF-encoding相关错误所带来影响,在实施过程当中要注意备份重要信息以防止意外发生造成无法挽回损失,并且逐一排查确认具体原因以采取针对性措施解除障碍。
161 12
|
7月前
|
关系型数据库 MySQL Java
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
710 0
|
8月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1911 45
|
8月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
636 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
关系型数据库 MySQL 网络安全
如何排查和解决PHP连接数据库MYSQL失败写锁的问题
通过本文的介绍,您可以系统地了解如何排查和解决PHP连接MySQL数据库失败及写锁问题。通过检查配置、确保服务启动、调整防火墙设置和用户权限,以及识别和解决长时间运行的事务和死锁问题,可以有效地保障应用的稳定运行。
332 25

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多