课时1:5分钟上手 Flink MySQL 连接器

简介: 课时1:5分钟上手 Flink MySQL 连接器

Flink-Learning训练营:课时1:5分钟上手 Flink MySQL 连接器

课程地址:https://developer.aliyun.com/trainingcamp/0bcc1ab57cf841a2af632d6252fecbab

实时数据接入:5分钟上手Fling MySQL连接器

 

内容简介

一、实验准备

二、实验内容

三、实验收获

 

一、实验准备

1)开通阿里云十计算的fling版的免费试用

图片1.png

在这个实验开始之前呢,需要去开通阿里云十计算的fling版的免费试用,然后因为我们需要去连接的是一个云数Mexico实例,所以我们也需要去开通阿里云数据库的rds免费试用。

本实验会flink是计算平台为基础使用flink自带的Mexico connect连接器,去连接rds云数据库的实例,并且一个实时商品销售数据统计的一个例子,尝试去上手connect的数据补货数据写入,,我们可以通过阿里云试用中心:
http://free.aliyun.com/?product=956456o&crowd=enterpise ,去申请实际计算flink跟云数据库IDS的适用,大家可以直接扫码或者点击链接就可以去使用了
图片2.png

实验之前我们需要先在Rds云数据库上面去进行一些简单配置,去建立一些数据库跟数据表,在完成我以上准备骤,之后我们就可以正式的开始我们的实验了

 

二、实验内容

(1).捕获原表数据变化

首先,我们之前已经建立了一张MySQL数据表,如下图

图片3.png

其中包含了实时更新的数据。

图片4.png 

为了将其转化为flink可以处理的流失数据,我们首先进入flink实时计算平台,创建一个flink作业,然后我们可以使用cdc连接器,连接到原数据表,它可以通过分析日志的方式捕获变更数据,并发送到下游系统。之后我们就可以使用table API编写数据处理的逻辑例如我们可以进入到实时计算平台,点击左侧的作业开发页面,点击新建按钮,我们可以新建一个job。之后我们可以创建一张source table的表,这个表跟我们之前创建的MySQL数据表的schema 致,之后我们在这个with里面提供一些连接所需要的参数,例如配置使用MySQL cdc连接器,host name是之前我们建立IDS实例的时候提供的ual, usaname 和pasorde是之前设定的实例登陆用户名和密码,为了验证连接是否成功,我们可以直接在这下面编写类似于cgo的语句。Select from sort table,获取source table中实时更新的所有数据。例如在这里我们看到了实时更新的数据,都来自于我们之前创建的源表,这说明我们的连接器生效了,它能够顺利的捕获变更的数据可以直接在这里添加新的数据库里逻辑,例如这段SQL语句实际上就是将原始输入的数据,以每15秒为一个时间窗口进行,并且按照商品ID以及商品在15秒内的销售量进行统计并展示结果可以看到它正确的执行了计算,并且会随着数据的到来实时进行更新。

(2)接入维度表,打宽数据

我们可以看到在原表中我们使用了good ID
图片5.png来表明每件商品的销售记录,但包括商品名称,商品价格之类的信息则保存在另一张单独的表中。我们同样可以使用类似的语句,建立连接维度表的临时表,并且使用left join语法将它之前计算的数据流结果进行连接。我们可以执行一下,看看结果,我们可以看到这里正常的进行了连接,并且计算出了每件商品的利润,

(3)数据写回会表

我们可以使用跟之前类似的方式建立连接汇表的临时表,并且使用insert into cycle语句数据流写回MySQL数据库。在这里,我们使用think table这张临时表来连接我们的数据库里的汇表,并且加入了一个scan auto commit为TRUE这个参数,表明我们希望这个数据库在发生更动时自动commit到数据库。然后我们只需要在上面加入一行insert into Table语句
图片6.png

表示我们希望把下面这行语句的执行结果插入到会表之中,我们来验证一下执行结果是否正确。可以看到,执行结果正确。因为在阿里云实际计算平台中使用测试,调试执行时数据不会写入下一个会表,所以为了验证这一功能是否正常,我们还需要点击上线功能,将作业部署到flink集群上面进行测试。点击确认然后我们可以进入运维页面,可以看到这个任务,现在是已停止的状态,

图片7.png

我们点击启动使用默认配置启动这个作业。在作业成功启动之后,我们可以进入数据库查看汇表中是否有新增的数据。可以看到这里有新增的数据代表我们的数据顺利写入了会表之中。同时,我们可以在作业总览中看到数据的数据流图,

图片8.png

例如1处是我们的原表获取数据,2处是我们进行窗口聚合的操作,3处是我们获取维度表中的数据,4是我们将原表处理后的数据跟维度表中的数据进行了一个聚合,最后我们将得到的结果写入会表5之中。

 

在我们不需要运行作业之后,我们可以点击停止按钮,停止这个作业

 

三、实验收获

学完该章节可以帮助您去掌握这些技术,包括去使用flink 10计算平台创建并且提交作业编写基于flink table APISQL语句并且使用MySQL Connector对数据库进行读写这方面的能力

这就是五分钟上手fling bicycle连接器的实验内容

 

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
关系型数据库 MySQL Java
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
|
8月前
|
SQL Java 关系型数据库
Java连接MySQL数据库环境设置指南
请注意,在实际部署时应该避免将敏感信息(如用户名和密码)硬编码在源码文件里面;应该使用配置文件或者环境变量等更为安全可靠地方式管理这些信息。此外,在处理大量数据时考虑使用PreparedStatement而不是Statement可以提高性能并防止SQL注入攻击;同时也要注意正确处理异常情况,并且确保所有打开过得资源都被正确关闭释放掉以防止内存泄漏等问题发生。
371 13
|
8月前
|
SQL 关系型数据库 MySQL
MySQL数据库连接过多(Too many connections)错误处理策略
综上所述,“Too many connections”错误处理策略涉及从具体参数配置到代码层面再到系统与架构设计全方位考量与改进。每项措施都需根据具体环境进行定制化调整,并且在执行任何变更前建议先行测试评估可能带来影响。
1723 11
|
8月前
|
SQL 监控 关系型数据库
查寻MySQL或SQL Server的连接数,并配置超时时间和最大连接量
以上步骤提供了直观、实用且易于理解且执行的指导方针来监管和优化数据库服务器配置。务必记得,在做任何重要变更前备份相关配置文件,并确保理解每个参数对系统性能可能产生影响后再做出调节。
781 11
|
8月前
|
SQL 关系型数据库 MySQL
排除通过IP访问MySQL时出现的连接错误问题
以上步骤涵盖了大多数遇到远程连接 MySQL 数据库时出现故障情形下所需采取措施,在执行每个步骤后都应该重新尝试建立链接以验证是否已经解决问题,在多数情形下按照以上顺序执行将能够有效地排除并修复大多数基本链接相关故障。
516 3
|
9月前
|
存储 关系型数据库 MySQL
修复.net Framework4.x连接MYSQL时遇到utf8mb3字符集不支持错误方案。
通过上述步骤大多数情况下能够解决由于UTF-encoding相关错误所带来影响,在实施过程当中要注意备份重要信息以防止意外发生造成无法挽回损失,并且逐一排查确认具体原因以采取针对性措施解除障碍。
602 12
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1390 0
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
1068 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
516 158
|
8月前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多