课时1:5分钟上手 Flink MySQL 连接器

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 课时1:5分钟上手 Flink MySQL 连接器

Flink-Learning训练营:课时1:5分钟上手 Flink MySQL 连接器

课程地址:https://developer.aliyun.com/trainingcamp/0bcc1ab57cf841a2af632d6252fecbab

实时数据接入:5分钟上手Fling MySQL连接器

 

内容简介

一、实验准备

二、实验内容

三、实验收获

 

一、实验准备

1)开通阿里云十计算的fling版的免费试用

图片1.png

在这个实验开始之前呢,需要去开通阿里云十计算的fling版的免费试用,然后因为我们需要去连接的是一个云数Mexico实例,所以我们也需要去开通阿里云数据库的rds免费试用。

本实验会flink是计算平台为基础使用flink自带的Mexico connect连接器,去连接rds云数据库的实例,并且一个实时商品销售数据统计的一个例子,尝试去上手connect的数据补货数据写入,,我们可以通过阿里云试用中心:
http://free.aliyun.com/?product=956456o&crowd=enterpise ,去申请实际计算flink跟云数据库IDS的适用,大家可以直接扫码或者点击链接就可以去使用了
图片2.png

实验之前我们需要先在Rds云数据库上面去进行一些简单配置,去建立一些数据库跟数据表,在完成我以上准备骤,之后我们就可以正式的开始我们的实验了

 

二、实验内容

(1).捕获原表数据变化

首先,我们之前已经建立了一张MySQL数据表,如下图

图片3.png

其中包含了实时更新的数据。

图片4.png 

为了将其转化为flink可以处理的流失数据,我们首先进入flink实时计算平台,创建一个flink作业,然后我们可以使用cdc连接器,连接到原数据表,它可以通过分析日志的方式捕获变更数据,并发送到下游系统。之后我们就可以使用table API编写数据处理的逻辑例如我们可以进入到实时计算平台,点击左侧的作业开发页面,点击新建按钮,我们可以新建一个job。之后我们可以创建一张source table的表,这个表跟我们之前创建的MySQL数据表的schema 致,之后我们在这个with里面提供一些连接所需要的参数,例如配置使用MySQL cdc连接器,host name是之前我们建立IDS实例的时候提供的ual, usaname 和pasorde是之前设定的实例登陆用户名和密码,为了验证连接是否成功,我们可以直接在这下面编写类似于cgo的语句。Select from sort table,获取source table中实时更新的所有数据。例如在这里我们看到了实时更新的数据,都来自于我们之前创建的源表,这说明我们的连接器生效了,它能够顺利的捕获变更的数据可以直接在这里添加新的数据库里逻辑,例如这段SQL语句实际上就是将原始输入的数据,以每15秒为一个时间窗口进行,并且按照商品ID以及商品在15秒内的销售量进行统计并展示结果可以看到它正确的执行了计算,并且会随着数据的到来实时进行更新。

(2)接入维度表,打宽数据

我们可以看到在原表中我们使用了good ID
图片5.png来表明每件商品的销售记录,但包括商品名称,商品价格之类的信息则保存在另一张单独的表中。我们同样可以使用类似的语句,建立连接维度表的临时表,并且使用left join语法将它之前计算的数据流结果进行连接。我们可以执行一下,看看结果,我们可以看到这里正常的进行了连接,并且计算出了每件商品的利润,

(3)数据写回会表

我们可以使用跟之前类似的方式建立连接汇表的临时表,并且使用insert into cycle语句数据流写回MySQL数据库。在这里,我们使用think table这张临时表来连接我们的数据库里的汇表,并且加入了一个scan auto commit为TRUE这个参数,表明我们希望这个数据库在发生更动时自动commit到数据库。然后我们只需要在上面加入一行insert into Table语句
图片6.png

表示我们希望把下面这行语句的执行结果插入到会表之中,我们来验证一下执行结果是否正确。可以看到,执行结果正确。因为在阿里云实际计算平台中使用测试,调试执行时数据不会写入下一个会表,所以为了验证这一功能是否正常,我们还需要点击上线功能,将作业部署到flink集群上面进行测试。点击确认然后我们可以进入运维页面,可以看到这个任务,现在是已停止的状态,

图片7.png

我们点击启动使用默认配置启动这个作业。在作业成功启动之后,我们可以进入数据库查看汇表中是否有新增的数据。可以看到这里有新增的数据代表我们的数据顺利写入了会表之中。同时,我们可以在作业总览中看到数据的数据流图,

图片8.png

例如1处是我们的原表获取数据,2处是我们进行窗口聚合的操作,3处是我们获取维度表中的数据,4是我们将原表处理后的数据跟维度表中的数据进行了一个聚合,最后我们将得到的结果写入会表5之中。

 

在我们不需要运行作业之后,我们可以点击停止按钮,停止这个作业

 

三、实验收获

学完该章节可以帮助您去掌握这些技术,包括去使用flink 10计算平台创建并且提交作业编写基于flink table APISQL语句并且使用MySQL Connector对数据库进行读写这方面的能力

这就是五分钟上手fling bicycle连接器的实验内容

 

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
22天前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
93 0
|
5天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
8天前
|
SQL JavaScript 关系型数据库
node博客小项目:接口开发、连接mysql数据库
【10月更文挑战第14天】node博客小项目:接口开发、连接mysql数据库
|
14天前
|
Java 关系型数据库 MySQL
【编程基础知识】Eclipse连接MySQL 8.0时的JDK版本和驱动问题全解析
本文详细解析了在使用Eclipse连接MySQL 8.0时常见的JDK版本不兼容、驱动类错误和时区设置问题,并提供了清晰的解决方案。通过正确配置JDK版本、选择合适的驱动类和设置时区,确保Java应用能够顺利连接MySQL 8.0。
78 1
|
18天前
|
Java 关系型数据库 MySQL
springboot学习五:springboot整合Mybatis 连接 mysql数据库
这篇文章是关于如何使用Spring Boot整合MyBatis来连接MySQL数据库,并进行基本的增删改查操作的教程。
29 0
springboot学习五:springboot整合Mybatis 连接 mysql数据库
|
17天前
|
SQL JavaScript 关系型数据库
Node.js 连接 MySQL
10月更文挑战第9天
13 0
|
20天前
|
关系型数据库 MySQL Linux
Navicat 连接 Windows、Linux系统下的MySQL 各种错误,修改密码。
使用Navicat连接Windows和Linux系统下的MySQL时可能遇到的四种错误及其解决方法,包括错误代码2003、1045和2013,以及如何修改MySQL密码。
136 0
|
2月前
|
SQL JavaScript 关系型数据库
Node服务连接Mysql数据库
本文介绍了如何在Node服务中连接MySQL数据库,并实现心跳包连接机制。
36 0
Node服务连接Mysql数据库
|
22天前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
28 0
|
2月前
|
关系型数据库 MySQL 数据库
docker启动mysql多实例连接报错Can’t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock’
docker启动mysql多实例连接报错Can’t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock’
152 0

相关产品

  • 实时计算 Flink版