课时1:5分钟上手 Flink MySQL 连接器

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 课时1:5分钟上手 Flink MySQL 连接器

Flink-Learning训练营:课时1:5分钟上手 Flink MySQL 连接器

课程地址:https://developer.aliyun.com/trainingcamp/0bcc1ab57cf841a2af632d6252fecbab

实时数据接入:5分钟上手Fling MySQL连接器

 

内容简介

一、实验准备

二、实验内容

三、实验收获

 

一、实验准备

1)开通阿里云十计算的fling版的免费试用

图片1.png

在这个实验开始之前呢,需要去开通阿里云十计算的fling版的免费试用,然后因为我们需要去连接的是一个云数Mexico实例,所以我们也需要去开通阿里云数据库的rds免费试用。

本实验会flink是计算平台为基础使用flink自带的Mexico connect连接器,去连接rds云数据库的实例,并且一个实时商品销售数据统计的一个例子,尝试去上手connect的数据补货数据写入,,我们可以通过阿里云试用中心:
http://free.aliyun.com/?product=956456o&crowd=enterpise ,去申请实际计算flink跟云数据库IDS的适用,大家可以直接扫码或者点击链接就可以去使用了
图片2.png

实验之前我们需要先在Rds云数据库上面去进行一些简单配置,去建立一些数据库跟数据表,在完成我以上准备骤,之后我们就可以正式的开始我们的实验了

 

二、实验内容

(1).捕获原表数据变化

首先,我们之前已经建立了一张MySQL数据表,如下图

图片3.png

其中包含了实时更新的数据。

图片4.png 

为了将其转化为flink可以处理的流失数据,我们首先进入flink实时计算平台,创建一个flink作业,然后我们可以使用cdc连接器,连接到原数据表,它可以通过分析日志的方式捕获变更数据,并发送到下游系统。之后我们就可以使用table API编写数据处理的逻辑例如我们可以进入到实时计算平台,点击左侧的作业开发页面,点击新建按钮,我们可以新建一个job。之后我们可以创建一张source table的表,这个表跟我们之前创建的MySQL数据表的schema 致,之后我们在这个with里面提供一些连接所需要的参数,例如配置使用MySQL cdc连接器,host name是之前我们建立IDS实例的时候提供的ual, usaname 和pasorde是之前设定的实例登陆用户名和密码,为了验证连接是否成功,我们可以直接在这下面编写类似于cgo的语句。Select from sort table,获取source table中实时更新的所有数据。例如在这里我们看到了实时更新的数据,都来自于我们之前创建的源表,这说明我们的连接器生效了,它能够顺利的捕获变更的数据可以直接在这里添加新的数据库里逻辑,例如这段SQL语句实际上就是将原始输入的数据,以每15秒为一个时间窗口进行,并且按照商品ID以及商品在15秒内的销售量进行统计并展示结果可以看到它正确的执行了计算,并且会随着数据的到来实时进行更新。

(2)接入维度表,打宽数据

我们可以看到在原表中我们使用了good ID
图片5.png来表明每件商品的销售记录,但包括商品名称,商品价格之类的信息则保存在另一张单独的表中。我们同样可以使用类似的语句,建立连接维度表的临时表,并且使用left join语法将它之前计算的数据流结果进行连接。我们可以执行一下,看看结果,我们可以看到这里正常的进行了连接,并且计算出了每件商品的利润,

(3)数据写回会表

我们可以使用跟之前类似的方式建立连接汇表的临时表,并且使用insert into cycle语句数据流写回MySQL数据库。在这里,我们使用think table这张临时表来连接我们的数据库里的汇表,并且加入了一个scan auto commit为TRUE这个参数,表明我们希望这个数据库在发生更动时自动commit到数据库。然后我们只需要在上面加入一行insert into Table语句
图片6.png

表示我们希望把下面这行语句的执行结果插入到会表之中,我们来验证一下执行结果是否正确。可以看到,执行结果正确。因为在阿里云实际计算平台中使用测试,调试执行时数据不会写入下一个会表,所以为了验证这一功能是否正常,我们还需要点击上线功能,将作业部署到flink集群上面进行测试。点击确认然后我们可以进入运维页面,可以看到这个任务,现在是已停止的状态,

图片7.png

我们点击启动使用默认配置启动这个作业。在作业成功启动之后,我们可以进入数据库查看汇表中是否有新增的数据。可以看到这里有新增的数据代表我们的数据顺利写入了会表之中。同时,我们可以在作业总览中看到数据的数据流图,

图片8.png

例如1处是我们的原表获取数据,2处是我们进行窗口聚合的操作,3处是我们获取维度表中的数据,4是我们将原表处理后的数据跟维度表中的数据进行了一个聚合,最后我们将得到的结果写入会表5之中。

 

在我们不需要运行作业之后,我们可以点击停止按钮,停止这个作业

 

三、实验收获

学完该章节可以帮助您去掌握这些技术,包括去使用flink 10计算平台创建并且提交作业编写基于flink table APISQL语句并且使用MySQL Connector对数据库进行读写这方面的能力

这就是五分钟上手fling bicycle连接器的实验内容

 

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
9天前
|
SQL 关系型数据库 MySQL
【Go语言专栏】使用Go语言连接MySQL数据库
【4月更文挑战第30天】本文介绍了如何使用Go语言连接和操作MySQL数据库,包括选择`go-sql-driver/mysql`驱动、安装导入、建立连接、执行SQL查询、插入/更新/删除操作、事务处理以及性能优化和最佳实践。通过示例代码,展示了连接数据库、使用连接池、事务管理和性能调优的方法,帮助开发者构建高效、稳定的Web应用。
|
14天前
|
存储 关系型数据库 MySQL
SpringSecurity_连接mysql(初出茅庐)
SpringSecurity_连接mysql(初出茅庐)
20 0
|
8天前
|
SQL 数据可视化 关系型数据库
【MySQL-11】多表查询全解-【多表关系/内外自连接/子查询/多表查询案例链接】(可cv代码&案例演示)
【MySQL-11】多表查询全解-【多表关系/内外自连接/子查询/多表查询案例链接】(可cv代码&案例演示)
|
9天前
|
关系型数据库 MySQL Java
datagrip连接mysql报错: No appropriate protocol (protocol is disabled or cipher suites are inappropriate
datagrip连接mysql报错: No appropriate protocol (protocol is disabled or cipher suites are inappropriate
|
9天前
|
关系型数据库 MySQL PHP
【PHP 开发专栏】PHP 连接 MySQL 数据库的方法
【4月更文挑战第30天】本文介绍了 PHP 连接 MySQL 的两种主要方法:mysqli 和 PDO 扩展,包括连接、查询和处理结果的基本步骤。还讨论了连接参数设置、常见问题及解决方法,如连接失败、权限和字符集问题。此外,提到了高级技巧如使用连接池和缓存连接信息以优化性能。最后,通过实际案例分析了在用户登录系统和数据管理中的应用。
|
9天前
|
SQL 关系型数据库 MySQL
使用Python的pymysql库连接MySQL,执行CRUD操作
使用Python的pymysql库连接MySQL,执行CRUD操作:安装pymysql,然后连接(host='localhost',user='root',password='yourpassword',database='yourdatabase'),创建游标。查询数据示例:`SELECT * FROM yourtable`;插入数据:`INSERT INTO yourtable...`;更新数据:`UPDATE yourtable SET...`;删除数据:`DELETE FROM yourtable WHERE...`。
25 0
|
10天前
|
DataWorks NoSQL 关系型数据库
DataWorks操作报错合集之在使用 DataWorks 进行 MongoDB 同步时遇到了连通性测试失败,实例配置和 MongoDB 白名单配置均正确,且同 VPC 下 MySQL 可以成功连接并同步,但 MongoDB 却无法完成同样的操作如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
27 1
|
10天前
|
弹性计算 关系型数据库 MySQL
检测MySQL 数据库连接数量
【4月更文挑战第29天】
8 0
|
10天前
|
Java 关系型数据库 MySQL
Java基础教程(20)-Java连接mysql数据库CURD
【4月更文挑战第19天】MySQL是流行的关系型数据库管理系统,支持SQL语法。在IDEA中加载jar包到项目类路径:右击项目,选择“Open Module Settings”,添加库文件。使用JDBC连接MySQL,首先下载JDBC驱动,然后通过`Class.forName()`加载驱动,`DriverManager.getConnection()`建立连接。执行CRUD操作,例如创建表、插入数据和查询,使用`Statement`或`PreparedStatement`,并确保正确关闭数据库资源。
|
10天前
|
JSON 前端开发 Java
管理系统总结(前端:Vue-cli, 后端Jdbc连接mysql数据库,项目部署tomcat里)
管理系统总结(前端:Vue-cli, 后端Jdbc连接mysql数据库,项目部署tomcat里)

相关产品

  • 实时计算 Flink版