SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。

SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动

一、概述

Flink CDC 是一个基于 Apache Flink 的数据捕获工具,能够实时捕获和处理数据库的变动事件。通过集成 Flink CDC,可以实时追踪 MySQL 数据库中的数据变动,构建高效的数据处理和分析应用。本文将介绍如何在 SpringBoot 项目中集成 Flink CDC,并实现对 MySQL 数据变动的实时追踪。

二、准备工作

1. 环境准备

  • JDK 1.8+
  • Maven 3.6+
  • MySQL 数据库
  • Apache Flink 1.12+
  • SpringBoot 2.5+

2. 创建 MySQL 数据库和表

CREATE DATABASE test_db;

USE test_db;

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
​

三、集成步骤

1. 引入依赖

在 SpringBoot 项目的 pom.xml 中添加必要的依赖:

<dependencies>
    <!-- Spring Boot Dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <!-- Flink Dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <!-- Flink CDC Dependencies -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
</dependencies>
​

2. 配置 Flink CDC

在 SpringBoot 项目中创建 Flink CDC 配置类:

import com.ververica.cdc.connectors.mysql.MySQLSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlinkCdcConfig {

    @Bean
    public DataStreamSource<String> mysqlSource(StreamExecutionEnvironment env) {
        MySQLSource<String> source = MySQLSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("test_db")
            .tableList("test_db.users")
            .username("root")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();

        return env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");
    }
}
​

3. 创建 Flink 作业

在 SpringBoot 项目中创建 Flink 作业:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class FlinkJobRunner implements CommandLineRunner {

    private final StreamExecutionEnvironment env;
    private final DataStreamSource<String> mysqlSource;

    public FlinkJobRunner(StreamExecutionEnvironment env, DataStreamSource<String> mysqlSource) {
        this.env = env;
        this.mysqlSource = mysqlSource;
    }

    @Override
    public void run(String... args) throws Exception {
        mysqlSource.print();
        env.execute("Flink CDC Job");
    }
}
​

4. 启动 SpringBoot 应用

运行 SpringBoot 应用,启动后会自动执行 Flink 作业,并打印 MySQL 数据库中 users 表的变动。

四、验证和测试

1. 插入测试数据

向 MySQL 数据库中插入数据:

INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com');
​

2. 验证输出

查看 SpringBoot 应用的控制台输出,确认是否正确捕获并打印了 MySQL 数据库中的变动。

五、总结

通过以上步骤,我们在 SpringBoot 项目中集成了 Flink CDC,实现了对 MySQL 数据变动的实时追踪。这种方法可以用于构建高效的实时数据处理和分析系统,适用于各种需要数据实时同步和处理的场景。

思维导图

- SpringBoot 集成 Flink CDC 实时追踪 MySQL 数据变动
  - 准备工作
    - 环境准备
    - 创建 MySQL 数据库和表
  - 集成步骤
    - 引入依赖
    - 配置 Flink CDC
    - 创建 Flink 作业
    - 启动 SpringBoot 应用
  - 验证和测试
    - 插入测试数据
    - 验证输出
  - 总结
​

通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
7月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
5月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
288 0
|
4月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
364 10
|
5月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
148 0
|
Java 关系型数据库 MySQL
Spring Boot入门(2)使用MySQL数据库
介绍   本文将介绍如何在Spring项目中连接、处理MySQL数据库。   该项目使用Spring Data JPA和Hibernate来连接、处理MySQL数据库,当然,这仅仅是其中一种方式,你也可以使用Spring JDBC或者MyBatis.   Spring Data JPA是Spring Data的一个子项目,主要用于简化数据访问层的实现,使用Spring Data JPA可以轻松实现增删改查、分页、排序等。
2680 0
|
5月前
|
Java Spring 容器
SpringBoot自动配置的原理是什么?
Spring Boot自动配置核心在于@EnableAutoConfiguration注解,它通过@Import导入配置选择器,加载META-INF/spring.factories中定义的自动配置类。这些类根据@Conditional系列注解判断是否生效。但Spring Boot 3.0后已弃用spring.factories,改用新格式的.imports文件进行配置。
930 0
|
6月前
|
人工智能 Java 测试技术
Spring Boot 集成 JUnit 单元测试
本文介绍了在Spring Boot中使用JUnit 5进行单元测试的常用方法与技巧,包括添加依赖、编写测试类、使用@SpringBootTest参数、自动装配测试模块(如JSON、MVC、WebFlux、JDBC等),以及@MockBean和@SpyBean的应用。内容实用,适合Java开发者参考学习。
675 0
|
2月前
|
JavaScript Java Maven
【SpringBoot(二)】带你认识Yaml配置文件类型、SpringMVC的资源访问路径 和 静态资源配置的原理!
SpringBoot专栏第二章,从本章开始正式进入SpringBoot的WEB阶段开发,本章先带你认识yaml配置文件和资源的路径配置原理,以方便在后面的文章中打下基础
318 3
|
2月前
|
Java 测试技术 数据库连接
【SpringBoot(四)】还不懂文件上传?JUnit使用?本文带你了解SpringBoot的文件上传、异常处理、组件注入等知识!并且带你领悟JUnit单元测试的使用!
Spring专栏第四章,本文带你上手 SpringBoot 的文件上传、异常处理、组件注入等功能 并且为你演示Junit5的基础上手体验
867 2
|
9月前
|
前端开发 Java 数据库
微服务——SpringBoot使用归纳——Spring Boot集成Thymeleaf模板引擎——Thymeleaf 介绍
本课介绍Spring Boot集成Thymeleaf模板引擎。Thymeleaf是一款现代服务器端Java模板引擎,支持Web和独立环境,可实现自然模板开发,便于团队协作。与传统JSP不同,Thymeleaf模板可以直接在浏览器中打开,方便前端人员查看静态原型。通过在HTML标签中添加扩展属性(如`th:text`),Thymeleaf能够在服务运行时动态替换内容,展示数据库中的数据,同时兼容静态页面展示,为开发带来灵活性和便利性。
404 0

推荐镜像

更多