使用Apache Flink实现MySQL数据读取和写入的完整指南

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 使用Apache Flink实现MySQL数据读取和写入的完整指南

1. 导言:

Apache Flink是一款功能强大的流式处理引擎,可用于实时处理大规模数据。本文将介绍如何使用Flink与MySQL数据库进行交互,以清洗股票数据为例。

2. 环境准备:

首先,确保已安装Apache Flink并配置好MySQL数据库。导入相关依赖包,并创建必要的Table。同时需要提前创建好mysql表,一行source表,一张sink表。

CREATE TABLE `re_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  `rise` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
package org.east;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;

object TableETL {
    def main(args: Array[String]): Unit = {
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
            .setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(senv)

        // 定义源表
        val source_table =
            """
            CREATE TEMPORARY TABLE t_stock_code_price (
                id BIGINT NOT NULL,
                code STRING NOT NULL,
                -- 其他字段...
                create_time STRING NOT NULL,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/mydb',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'table-name' = 't_stock_code_price',
                'username' = 'root',
                'password' = '12345678'
            )
            """.stripMargin

        // 定义目标表
        val sink_table =
            """
            CREATE TEMPORARY TABLE re_stock_code_price (
                id BIGINT NOT NULL,
                code STRING NOT NULL,
                -- 其他字段...
                create_time STRING NOT NULL,
                rise INT,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/mydb',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'table-name' = 're_stock_code_price',
                'username' = 'root',
                'password' = '12345678'
            )
            """.stripMargin

        tEnv.executeSql(source_table)
        tEnv.executeSql(sink_table)

在这段代码中,我们首先创建了Flink的流式执行环境和StreamTableEnvironment。然后,我们定义了两个临时表,用于存储原始股票数据和清洗后的数据。

3. 数据清洗:

接下来,我们执行数据清洗操作,并将结果写入目标表。

        // 执行清洗操作,并将结果写入目标表
        tEnv.executeSql("INSERT INTO re_stock_code_price " +
            "SELECT *, CASE WHEN change_percent > 0 THEN 1 ELSE 0 END AS rise FROM t_stock_code_price")

在这里,我们计算了股票涨跌情况,并将结果写入到目标表中。在这个例子中,我们假设change_percent字段表示股票价格的变化百分比,rise字段为1表示股票上涨,为0表示股票下跌。

4. 结果展示:

最后,我们查询目标表并打印结果。

5. 完整代码:

下面是完整的代码:

package org.east;
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object TableETL {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    val source_table =
      """
        |CREATE TEMPORARY TABLE t_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 't_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin


    val sink_table =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin


    tEnv.executeSql(source_table)
    tEnv.executeSql(sink_table)
    tEnv.executeSql("insert into re_stock_code_price select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

    val user_DS = tEnv.executeSql("select * from re_stock_code_price")

    user_DS.print()
  }
}


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
328 33
The Past, Present and Future of Apache Flink
|
25天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
77 16
|
1月前
|
存储 关系型数据库 MySQL
mysql怎么查询longblob类型数据的大小
通过本文的介绍,希望您能深入理解如何查询MySQL中 `LONG BLOB`类型数据的大小,并结合优化技术提升查询性能,以满足实际业务需求。
110 6
|
2月前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
171 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
1月前
|
SQL 关系型数据库 MySQL
mysql分页读取数据重复问题
在服务端开发中,与MySQL数据库进行数据交互时,常因数据量大、网络延迟等因素需分页读取数据。文章介绍了使用`limit`和`offset`参数实现分页的方法,并针对分页过程中可能出现的数据重复问题进行了详细分析,提出了利用时间戳或确保排序规则绝对性等解决方案。
|
2月前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
2月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
76 14
|
2月前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
77 9
|
2月前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
2月前
|
SQL 关系型数据库 MySQL
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
65 1

推荐镜像

更多