课时1:5分钟上手 Flink MySQL 连接器

简介: 课时1:5分钟上手 Flink MySQL 连接器

Flink-Learning训练营:课时1:5分钟上手 Flink MySQL 连接器

课程地址:https://developer.aliyun.com/trainingcamp/0bcc1ab57cf841a2af632d6252fecbab

实时数据接入:5分钟上手Fling MySQL连接器

 

内容简介

一、实验准备

二、实验内容

三、实验收获

 

一、实验准备

1)开通阿里云十计算的fling版的免费试用

图片1.png

在这个实验开始之前呢,需要去开通阿里云十计算的fling版的免费试用,然后因为我们需要去连接的是一个云数Mexico实例,所以我们也需要去开通阿里云数据库的rds免费试用。

本实验会flink是计算平台为基础使用flink自带的Mexico connect连接器,去连接rds云数据库的实例,并且一个实时商品销售数据统计的一个例子,尝试去上手connect的数据补货数据写入,,我们可以通过阿里云试用中心:
http://free.aliyun.com/?product=956456o&crowd=enterpise ,去申请实际计算flink跟云数据库IDS的适用,大家可以直接扫码或者点击链接就可以去使用了
图片2.png

实验之前我们需要先在Rds云数据库上面去进行一些简单配置,去建立一些数据库跟数据表,在完成我以上准备骤,之后我们就可以正式的开始我们的实验了

 

二、实验内容

(1).捕获原表数据变化

首先,我们之前已经建立了一张MySQL数据表,如下图

图片3.png

其中包含了实时更新的数据。

图片4.png 

为了将其转化为flink可以处理的流失数据,我们首先进入flink实时计算平台,创建一个flink作业,然后我们可以使用cdc连接器,连接到原数据表,它可以通过分析日志的方式捕获变更数据,并发送到下游系统。之后我们就可以使用table API编写数据处理的逻辑例如我们可以进入到实时计算平台,点击左侧的作业开发页面,点击新建按钮,我们可以新建一个job。之后我们可以创建一张source table的表,这个表跟我们之前创建的MySQL数据表的schema 致,之后我们在这个with里面提供一些连接所需要的参数,例如配置使用MySQL cdc连接器,host name是之前我们建立IDS实例的时候提供的ual, usaname 和pasorde是之前设定的实例登陆用户名和密码,为了验证连接是否成功,我们可以直接在这下面编写类似于cgo的语句。Select from sort table,获取source table中实时更新的所有数据。例如在这里我们看到了实时更新的数据,都来自于我们之前创建的源表,这说明我们的连接器生效了,它能够顺利的捕获变更的数据可以直接在这里添加新的数据库里逻辑,例如这段SQL语句实际上就是将原始输入的数据,以每15秒为一个时间窗口进行,并且按照商品ID以及商品在15秒内的销售量进行统计并展示结果可以看到它正确的执行了计算,并且会随着数据的到来实时进行更新。

(2)接入维度表,打宽数据

我们可以看到在原表中我们使用了good ID
图片5.png来表明每件商品的销售记录,但包括商品名称,商品价格之类的信息则保存在另一张单独的表中。我们同样可以使用类似的语句,建立连接维度表的临时表,并且使用left join语法将它之前计算的数据流结果进行连接。我们可以执行一下,看看结果,我们可以看到这里正常的进行了连接,并且计算出了每件商品的利润,

(3)数据写回会表

我们可以使用跟之前类似的方式建立连接汇表的临时表,并且使用insert into cycle语句数据流写回MySQL数据库。在这里,我们使用think table这张临时表来连接我们的数据库里的汇表,并且加入了一个scan auto commit为TRUE这个参数,表明我们希望这个数据库在发生更动时自动commit到数据库。然后我们只需要在上面加入一行insert into Table语句
图片6.png

表示我们希望把下面这行语句的执行结果插入到会表之中,我们来验证一下执行结果是否正确。可以看到,执行结果正确。因为在阿里云实际计算平台中使用测试,调试执行时数据不会写入下一个会表,所以为了验证这一功能是否正常,我们还需要点击上线功能,将作业部署到flink集群上面进行测试。点击确认然后我们可以进入运维页面,可以看到这个任务,现在是已停止的状态,

图片7.png

我们点击启动使用默认配置启动这个作业。在作业成功启动之后,我们可以进入数据库查看汇表中是否有新增的数据。可以看到这里有新增的数据代表我们的数据顺利写入了会表之中。同时,我们可以在作业总览中看到数据的数据流图,

图片8.png

例如1处是我们的原表获取数据,2处是我们进行窗口聚合的操作,3处是我们获取维度表中的数据,4是我们将原表处理后的数据跟维度表中的数据进行了一个聚合,最后我们将得到的结果写入会表5之中。

 

在我们不需要运行作业之后,我们可以点击停止按钮,停止这个作业

 

三、实验收获

学完该章节可以帮助您去掌握这些技术,包括去使用flink 10计算平台创建并且提交作业编写基于flink table APISQL语句并且使用MySQL Connector对数据库进行读写这方面的能力

这就是五分钟上手fling bicycle连接器的实验内容

 

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1317 0
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
2761 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
3306 45
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
1031 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
597 17
|
SQL Oracle 关系型数据库
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖
本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版操作报错之同步MySQL分库分表500张表报连接超时,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL 消息中间件 关系型数据库
技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入
本文主要介绍了 Flink CDC 分库分表怎么实时同步,以及其结合 Apache Doris Flink Connector 最新版本整合的 Flink 2PC 和 Doris Stream Load 2PC 的机制及整合原理、使用方法等。
技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入
|
8月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
749 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
    下一篇
    开通oss服务