Flink CDC

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: Flink CDC

1、Flink CDC

1.1、什么是 Flink CDC

       CDC(Change Data Capture)也就是变更数据采集的意思,我们之前学的 Maxwell 就是一种 CDC 工具。今天要学的 Flink CDC 则是 Flink 自己开发的一款与 Flink 自身无缝集成的 CDC 工具,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。

       目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。而Flink 正是集成了 Debezium 作为捕获数据变更的引擎,所以它可以充分发挥 Debezium 的能力。Flink 和 Debezium 最大的不同之处在于,Flink 并不强制依赖于 Kafka。

1.2、为什么要使用 Flink CDC?

       之前我们监听 MySQL 的业务数据变更是通过 Maxwell 或者可以通过 Cannal ,它们都是通过监听 binlog 日志来实现数据同步的。

       在实时数仓中,如果我们要使用上面的 Maxwell 或 Cannal 做业务数据同步,那必须通过下面这几步:

  • mysql 开启 binlog
  • Maxwell/Canal 同步 binlog 数据写入到 Kafka
  • Flink 读取 Kakfa 中的 binlog 数据进行相关的业务处理

整体来看步骤很长,而且用到的组件也比较多,那能不能直接 Flink 一步到位,直接自己通过 binlog 来实现日志同步呢?所以 Flink CDC 这就出现了。

Flink CDC 数据格式

{
    "before": {
        "id": "PF1784570096901248",
        "pay_order_no": null,
        "out_no": "J1784570080435328",
        "title": "充值办卡",
        "from_user_id": "PG11111",
        "from_account_id": "1286009802396288",
        "user_id": "BO1707796995184000",
        "account_id": "1707895210106496",
        "amount": 13400,
        "profit_state": 1,
        "profit_time": 1686758315000,
        "refund_state": 0,
        "refund_time": null,
        "add_time": 1686758315000,
        "remark": "充值办卡",
        "acct_circle": "PG11111",
        "user_type": 92,
        "from_user_type": 90,
        "company_id": "PG11111",
        "profit_mode": 1,
        "type": 2,
        "parent_id": null,
        "oc_profit_id": "1784570096901248",
        "keep_account_from_user_id": null,
        "keep_account_from_bm_user_id": null,
        "keep_account_user_id": null,
        "keep_account_bm_user_id": null,
        "biz_company_id": "PG11111"
    },
    "after": {
        "id": "PF1784570096901248",
        "pay_order_no": null,
        "out_no": "J1784570080435328",
        "title": "充值办卡",
        "from_user_id": "PG11111",
        "from_account_id": "1286009802396288",
        "user_id": "BO1707796995184000",
        "account_id": "1707895210106496",
        "amount": 13400,
        "profit_state": 1,
        "profit_time": 1686758315000,
        "refund_state": 0,
        "refund_time": null,
        "add_time": 1686758315000,
        "remark": "充值办卡1",
        "acct_circle": "PG11111",
        "user_type": 92,
        "from_user_type": 90,
        "company_id": "PG11111",
        "profit_mode": 1,
        "type": 2,
        "parent_id": null,
        "oc_profit_id": "1784570096901248",
        "keep_account_from_user_id": null,
        "keep_account_from_bm_user_id": null,
        "keep_account_user_id": null,
        "keep_account_bm_user_id": null,
        "biz_company_id": "PG11111"
    },
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1686734882000,
        "snapshot": "false",
        "db": "cloud_test",
        "sequence": null,
        "table": "acct_profit",
        "server_id": 1,
        "gtid": null,
        "file": "mysql-bin.000514",
        "pos": 650576218,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "u",
    "ts_ms": 1686734882689,
    "transaction": null
}

1.3、使用 Flink CDC

MySQL 首先得开启 binlog 功能,这里需要特别注意 MySQL 时区的设置(我们需要保证 Flink 的时间和数据库时间保持一致):

show variables like '%time_zone%';
# 如果失去是UTC,则需要时间校正
set time_zone='+8:00';

1.3.1、变更数据处理逻辑

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
public class CustomSink extends RichSinkFunction<String> {
 
    // 自定义向下游写入
    @Override
    public void invoke(String json, Context context) throws Exception {
        // 每次产生变更操作后对数据进行什么操作,比如写入到ES、Redis、MongoDB等
        System.out.println(">>>" + json);
    }
 
    // 写入其它数据源需要创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
 
    }
    // 关闭外部数据库连接
    @Override
    public void close() throws Exception {
 
    }
}

1.3.2、构建 Flink CDC 监听程序

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
public class MySqlSourceExample {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> source = MySqlSource.builder()
                .hostname("hadoop102")
                .port(3306)
                .databaseList("数据库名")
                .tableList("表名")    // 多个表之间用逗号切割
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema())  // 反序列化器
                .includeSchemaChanges(true) // 是否监听表结构的变化
                .build();
 
        // 启动本地 WEB-UI
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT,8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
 
        // 检查点时间间隔
        env.enableCheckpointing(5000);
        DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .addSink(new CustomSink());
 
        env.execute();
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
19天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
46 9
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
615 1
Flink CDC:新一代实时数据集成框架
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
557 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
453 13
Flink CDC 在新能源制造业的实践
|
3月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
129 1
|
4月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
18266 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
4月前
|
SQL JSON 缓存
玳数科技集成 Flink CDC 3.0 的实践
本文投稿自玳数科技工程师杨槐老师,介绍了 Flink CDC 3.0 与 ChunJun 框架在玳数科技的集成实践。
597 7
玳数科技集成 Flink CDC 3.0 的实践
|
3月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
312 2
|
3月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
356 1

热门文章

最新文章

下一篇
无影云桌面