大数据-业务数据采集-FlinkCDC

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 大数据-业务数据采集-FlinkCDC

CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:

基于查询的 CDC 基于 Binlog 的 CDC
开源产品 Sqoop、Kafka JDBC Source Canal、Maxwell、Debezium
执行模式 Batch Streaming
是否可以捕获所有数据变化
延迟性 高延迟 低延迟
是否增加数据库压力

FlinkCDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取【全量数据】和【增量变更数据】的 source 组件。而不需要使用类似 Kafka 之类的中间件中转数据

目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

Connector Database Driver
mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.1
mysql-cdc MySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.27
oceanbase-cdc OceanBase CE: 3.1.x
OceanBase EE (MySQL mode): 2.x, 3.x
JDBC Driver: 5.1.4x
oracle-cdc Oracle: 11, 12, 19 Oracle Driver: 19.3.0.0
postgres-cdc PostgreSQL: 9.6, 10, 11, 12 JDBC Driver: 42.2.12
sqlserver-cdc Sqlserver: 2012, 2014, 2016, 2017, 2019 JDBC Driver: 7.2.2.jre8
tidb-cdc TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 JDBC Driver: 8.0.27
db2-cdc Db2: 11.5 DB2 Driver: 11.5.0.0

DataStream:

  • 优点: 多库多表
  • 缺点: 需要自定义反序列化器(但灵活)
    FlinkSQL:
  • 优点: 不需要自定义反序列化器
  • 缺点: 单表

Demo

注意开启 binlog_format=ROW

my.ini

log-bin=mysql-bin
#binlog_format="STATEMENT"
binlog_format="ROW"
#binlog_format="MIXED"
#service-id=1

POM

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
    </dependencies>

基于 DataStream

CustomerDeserialization.java

package com.vipsoft;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
    /**
     * 封装的数据格式
     * {
     * "database":"",
     * "tableName":"",
     * "before":{"id":"","tm_name":""....},
     * "after":{"id":"","tm_name":""....},
     * "type":"c u d",
     * //"ts":156456135615
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //1.创建JSON对象用于存储最终数据
        JSONObject result = new JSONObject();
        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }
        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        //5.获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }
        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);
        //7.输出数据
        collector.collect(result.toJSONString());
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

FlinkCDC.java

package com.vipsoft;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .serverTimeZone("GMT+8")  //时区报错增加这个设置
                .port(3306)
                .username("root")
                .password("110")
                .databaseList("springboot")
                .tableList("springboot.sys_user")   //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
                //.deserializer(new StringDebeziumDeserializationSchema())
                .deserializer(new CustomerDeserialization()) //使用自定义反序列化器
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);
        //3.打印数据
        streamSource.print();
        //4.启动任务
        env.execute("FlinkCDC");
    }
}

运行效果

  • 默认 StringDebeziumDeserializationSchema
  • 自定义反序列化器

FlinkSQL

package com.vipsoft;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkCDCWithSQL {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //2.DDL方式建表
        tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
                " id STRING NOT NULL, " +
                " username STRING, " +
                " nick_name STRING " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'localhost', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = '110', " +
                " 'database-name' = 'springboot', " +
                " 'table-name' = 'sys_user' " +
                ")");
        //3.查询数据
        Table table = tableEnv.sqlQuery("select * from mysql_binlog");
        //4.将动态表转换为流
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();
        //5.启动任务
        env.execute("FlinkCDCWithSQL");
    }
}

运行效果

对比

通过对比,FlinkCDC 最舒服

FlinkCDC Maxwell Canal
断点续传 CK MySQL 本地磁盘
SQL -> 数据 一对一(炸开处理)
初始化功能 有(多库多表) 有(单表) 无(单独查询历史数据)
封装格式 自定义 JSON JSON(c/s自定义)
高可用 运行集群高可用 集群(ZK)

插入对比

插入两条数据

INSER INTO z_user_info VALUES(30,'zhang3','13800000000'),(31,'li4','13999999999')

FlinkCDC 每条变化都会产生一条 json

Maxwell 每条变化都会产生一条 json

Canal 一次性执行的SQL,会产生一条JSON(两条数据组合在一起)【不方便,需要炸开解析】

更新对比

UPDATE z_user_info SET user_name='wang5' WHERE id IN(30,31)

FlinkCDC 包括了修改前的 before 数据

Maxwell 不包括修改前的数据

Canal 仍然是一条json

删除对比

DELETE FROM z_user_info WHERE id IN(30,31)

FlinkCDC 两条删除的 json 数据

Maxwell

Canal

【尚硅谷】Flink数据仓库视频教程

大数据-数据仓库-实时数仓架构分析

大数据-业务数据采集-FlinkCDC

大数据 - DWD&DIM 行为数据

大数据 - DWD&DIM 业务数据

大数据 DWM层 业务实现

目录
相关文章
|
6月前
|
数据采集 传感器 人工智能
大数据关键技术之电商API接口接入数据采集发展趋势
本文从数据采集场景、数据采集系统、数据采集技术方面阐述数据采集的发展趋势。 01 数据采集场景的发展趋势 作为大数据和人工智能工程的源头,数据采集的场景伴随着应用场景的发展而变化,以下是数据采集场景的发展趋势。
|
数据采集 存储 监控
大数据的数据来源 - 数据采集的方式(数据接入的方式)
大数据处理关键技术一般包括:大数据采集、大数据预处理、大数据存储及管理、大数据分析及挖掘、大数据展现和应用(大数据检索、大数据可视化、大数据应用、大数据安全等)。下面主要介绍下大数据采集
4407 0
|
25天前
|
数据采集 传感器 大数据
大数据中数据采集 (Data Collection)
【10月更文挑战第17天】
54 2
|
3月前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
56 1
|
3月前
|
数据采集 关系型数据库 MySQL
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
41 1
|
3月前
|
数据采集 大数据
大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property
大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property
36 1
|
6月前
|
资源调度 分布式计算 Oracle
助力工业物联网,工业大数据项目之数据采集【四】
助力工业物联网,工业大数据项目之数据采集【四】
74 0
|
数据采集 消息中间件 分布式计算
大数据数据采集的数据采集(收集/聚合)的Logstash之数据采集流程的output
在大数据领域,数据采集是非常重要的一环。而Logstash作为一个开源的数据收集引擎,可以帮助我们轻松地实现数据的采集、聚合和传输等功能。本文将会对Logstash之数据采集流程的Output进行详细介绍。
124 1
|
SQL 数据采集 分布式计算
大数据数据采集的数据迁移(同步/传输)的Sqoop之基本命令和使用的导入/导出数据
在大数据领域,数据迁移(同步/传输)也是非常重要的一环。Sqoop作为一个开源的数据迁移工具,可以帮助我们轻松地实现关系型数据库与Hadoop之间的数据迁移。本文将会对Sqoop的基本命令和使用进行详细介绍。
386 1
|
数据采集 网络协议 大数据
大数据数据采集的数据采集(收集/聚合)的Logstash之安装部署
随着大数据技术的不断发展,越来越多的企业开始重视数据采集的工作。而在数据采集过程中,Logstash是一款非常优秀的开源工具,能够高效、稳定地完成各种数据来源的数据采集工作。本文将介绍如何安装和部署Logstash,希望能够为大家提供一些参考和帮助。
110 1