SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。

SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动

一、概述

Flink CDC 是一个基于 Apache Flink 的数据捕获工具,能够实时捕获和处理数据库的变动事件。通过集成 Flink CDC,可以实时追踪 MySQL 数据库中的数据变动,构建高效的数据处理和分析应用。本文将介绍如何在 SpringBoot 项目中集成 Flink CDC,并实现对 MySQL 数据变动的实时追踪。

二、准备工作

1. 环境准备

  • JDK 1.8+
  • Maven 3.6+
  • MySQL 数据库
  • Apache Flink 1.12+
  • SpringBoot 2.5+

2. 创建 MySQL 数据库和表

CREATE DATABASE test_db;

USE test_db;

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
​

三、集成步骤

1. 引入依赖

在 SpringBoot 项目的 pom.xml 中添加必要的依赖:

<dependencies>
    <!-- Spring Boot Dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <!-- Flink Dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <!-- Flink CDC Dependencies -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
</dependencies>
​

2. 配置 Flink CDC

在 SpringBoot 项目中创建 Flink CDC 配置类:

import com.ververica.cdc.connectors.mysql.MySQLSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlinkCdcConfig {

    @Bean
    public DataStreamSource<String> mysqlSource(StreamExecutionEnvironment env) {
        MySQLSource<String> source = MySQLSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("test_db")
            .tableList("test_db.users")
            .username("root")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();

        return env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");
    }
}
​

3. 创建 Flink 作业

在 SpringBoot 项目中创建 Flink 作业:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class FlinkJobRunner implements CommandLineRunner {

    private final StreamExecutionEnvironment env;
    private final DataStreamSource<String> mysqlSource;

    public FlinkJobRunner(StreamExecutionEnvironment env, DataStreamSource<String> mysqlSource) {
        this.env = env;
        this.mysqlSource = mysqlSource;
    }

    @Override
    public void run(String... args) throws Exception {
        mysqlSource.print();
        env.execute("Flink CDC Job");
    }
}
​

4. 启动 SpringBoot 应用

运行 SpringBoot 应用,启动后会自动执行 Flink 作业,并打印 MySQL 数据库中 users 表的变动。

四、验证和测试

1. 插入测试数据

向 MySQL 数据库中插入数据:

INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com');
​

2. 验证输出

查看 SpringBoot 应用的控制台输出,确认是否正确捕获并打印了 MySQL 数据库中的变动。

五、总结

通过以上步骤,我们在 SpringBoot 项目中集成了 Flink CDC,实现了对 MySQL 数据变动的实时追踪。这种方法可以用于构建高效的实时数据处理和分析系统,适用于各种需要数据实时同步和处理的场景。

思维导图

- SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动
  - 准备工作
    - 环境准备
    - 创建 MySQL 数据库和表
  - 集成步骤
    - 引入依赖
    - 配置 Flink CDC
    - 创建 Flink 作业
    - 启动 SpringBoot 应用
  - 验证和测试
    - 插入测试数据
    - 验证输出
  - 总结
​

通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
8月前
|
人工智能 运维 Java
SpringBoot+MySQL实现动态定时任务
这是一个基于Spring Boot的动态定时任务Demo,利用spring-context模块实现任务调度功能。服务启动时会扫描数据库中的任务表,将任务添加到调度器中,并通过固定频率运行的ScheduleUpdater任务动态更新任务状态和Cron表达式。核心功能包括任务的新增、删除与Cron调整,支持通过ScheduledFuture对象控制任务执行。项目依赖Spring Boot 2.2.10.RELEASE,使用MySQL存储任务信息,包含任务基类ITask及具体实现(如FooTask),便于用户扩展运维界面以增强灵活性。
325 10
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1121 0
|
7月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
414 0
|
9月前
|
监控 Java 关系型数据库
Spring Boot整合MySQL主从集群同步延迟解决方案
本文针对电商系统在Spring Boot+MyBatis架构下的典型问题(如大促时订单状态延迟、库存超卖误判及用户信息更新延迟)提出解决方案。核心内容包括动态数据源路由(强制读主库)、大事务拆分优化以及延迟感知补偿机制,配合MySQL参数调优和监控集成,有效将主从延迟控制在1秒内。实际测试表明,在10万QPS场景下,订单查询延迟显著降低,超卖误判率下降98%。
433 5
|
11月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
905 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
11月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
786 12
Flink CDC YAML:面向数据集成的 API 设计
|
10月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
527 6
|
10月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
504 5
|
Java 中间件 流计算
Flink 如何分流数据
Flink 如何分流数据,3种分流方式
4392 0

热门文章

最新文章

推荐镜像

更多