Flink CDC

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Flink CDC

1、Flink CDC

1.1、什么是 Flink CDC

       CDC(Change Data Capture)也就是变更数据采集的意思,我们之前学的 Maxwell 就是一种 CDC 工具。今天要学的 Flink CDC 则是 Flink 自己开发的一款与 Flink 自身无缝集成的 CDC 工具,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。

       目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。而Flink 正是集成了 Debezium 作为捕获数据变更的引擎,所以它可以充分发挥 Debezium 的能力。Flink 和 Debezium 最大的不同之处在于,Flink 并不强制依赖于 Kafka。

1.2、为什么要使用 Flink CDC?

       之前我们监听 MySQL 的业务数据变更是通过 Maxwell 或者可以通过 Cannal ,它们都是通过监听 binlog 日志来实现数据同步的。

       在实时数仓中,如果我们要使用上面的 Maxwell 或 Cannal 做业务数据同步,那必须通过下面这几步:

  • mysql 开启 binlog
  • Maxwell/Canal 同步 binlog 数据写入到 Kafka
  • Flink 读取 Kakfa 中的 binlog 数据进行相关的业务处理

整体来看步骤很长,而且用到的组件也比较多,那能不能直接 Flink 一步到位,直接自己通过 binlog 来实现日志同步呢?所以 Flink CDC 这就出现了。

Flink CDC 数据格式

{
    "before": {
        "id": "PF1784570096901248",
        "pay_order_no": null,
        "out_no": "J1784570080435328",
        "title": "充值办卡",
        "from_user_id": "PG11111",
        "from_account_id": "1286009802396288",
        "user_id": "BO1707796995184000",
        "account_id": "1707895210106496",
        "amount": 13400,
        "profit_state": 1,
        "profit_time": 1686758315000,
        "refund_state": 0,
        "refund_time": null,
        "add_time": 1686758315000,
        "remark": "充值办卡",
        "acct_circle": "PG11111",
        "user_type": 92,
        "from_user_type": 90,
        "company_id": "PG11111",
        "profit_mode": 1,
        "type": 2,
        "parent_id": null,
        "oc_profit_id": "1784570096901248",
        "keep_account_from_user_id": null,
        "keep_account_from_bm_user_id": null,
        "keep_account_user_id": null,
        "keep_account_bm_user_id": null,
        "biz_company_id": "PG11111"
    },
    "after": {
        "id": "PF1784570096901248",
        "pay_order_no": null,
        "out_no": "J1784570080435328",
        "title": "充值办卡",
        "from_user_id": "PG11111",
        "from_account_id": "1286009802396288",
        "user_id": "BO1707796995184000",
        "account_id": "1707895210106496",
        "amount": 13400,
        "profit_state": 1,
        "profit_time": 1686758315000,
        "refund_state": 0,
        "refund_time": null,
        "add_time": 1686758315000,
        "remark": "充值办卡1",
        "acct_circle": "PG11111",
        "user_type": 92,
        "from_user_type": 90,
        "company_id": "PG11111",
        "profit_mode": 1,
        "type": 2,
        "parent_id": null,
        "oc_profit_id": "1784570096901248",
        "keep_account_from_user_id": null,
        "keep_account_from_bm_user_id": null,
        "keep_account_user_id": null,
        "keep_account_bm_user_id": null,
        "biz_company_id": "PG11111"
    },
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1686734882000,
        "snapshot": "false",
        "db": "cloud_test",
        "sequence": null,
        "table": "acct_profit",
        "server_id": 1,
        "gtid": null,
        "file": "mysql-bin.000514",
        "pos": 650576218,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "u",
    "ts_ms": 1686734882689,
    "transaction": null
}

1.3、使用 Flink CDC

MySQL 首先得开启 binlog 功能,这里需要特别注意 MySQL 时区的设置(我们需要保证 Flink 的时间和数据库时间保持一致):

show variables like '%time_zone%';
# 如果失去是UTC,则需要时间校正
set time_zone='+8:00';

1.3.1、变更数据处理逻辑

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
public class CustomSink extends RichSinkFunction<String> {
 
    // 自定义向下游写入
    @Override
    public void invoke(String json, Context context) throws Exception {
        // 每次产生变更操作后对数据进行什么操作,比如写入到ES、Redis、MongoDB等
        System.out.println(">>>" + json);
    }
 
    // 写入其它数据源需要创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
 
    }
    // 关闭外部数据库连接
    @Override
    public void close() throws Exception {
 
    }
}

1.3.2、构建 Flink CDC 监听程序

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
public class MySqlSourceExample {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> source = MySqlSource.builder()
                .hostname("hadoop102")
                .port(3306)
                .databaseList("数据库名")
                .tableList("表名")    // 多个表之间用逗号切割
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema())  // 反序列化器
                .includeSchemaChanges(true) // 是否监听表结构的变化
                .build();
 
        // 启动本地 WEB-UI
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT,8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
 
        // 检查点时间间隔
        env.enableCheckpointing(5000);
        DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .addSink(new CustomSink());
 
        env.execute();
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何将MySQL的CDC实时数据写入到Hudi
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用合集之cdc对于源库数据的抽取是否存在有些元数据的改变无法处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
8天前
|
SQL 监控 关系型数据库
实时计算 Flink版产品使用问题之使用mysql cdc配置StartupOptions.initial()全量之后就不增量了,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 关系型数据库 MySQL
Apache Flink CDC 3.1.0 发布公告
Apache Flink 社区很高兴地宣布发布 Flink CDC 3.1.0!
521 1
Apache Flink CDC 3.1.0 发布公告
|
1月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如果源表中删除了2023年之前的数据, flink cdc 里面也会会删除吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之CDC任务在异常后整个record sent从0初始化开始,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
365 0
|
6天前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
311 0
|
6天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之下载了mysql的cdc的demo,在本地调试时,报错:找不到这个包,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之使用 MySQL CDC 进行数据同步时,设置 server_id 参数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。