Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 1.11 最重要的 Feature —— Hive Streaming 之前已经和大家分享过了,今天就和大家来聊一聊另一个特别重要的功能 —— CDC。

Flink 1.11 最重要的 Feature —— Hive Streaming 之前已经和大家分享过了,今天就和大家来聊一聊另一个特别重要的功能 —— CDC。

CDC概述

何为CDC?Change Data Capture,将数据库中的’增’、’改’、’删’操作记录下来。在很早之前是通过触发器来完成记录,现在通过 binlog+同步中间件来实现。常用的 binlog 同步中间件有很多,比如 Alibaba 开源的 canal[1],Red Hat 开源的debezium[2],Zendesk 开源的 Maxwell[3] 等等。

这些中间件会负责 binlog 的解析,并同步到消息中间件中,我们只需要消费对应的 Topic 即可。

回到 Flink 上,CDC 似乎和我们没有太大的关联?其实不然,让我们更加抽象地来看这个世界。

当我们用 Flink 去消费数据比如 Kafka 时,我们就仿佛在读一张表,什么表?一张不断有记录被插入的表,我们将每一条被插入的数据取出来,完成我们的逻辑。

6401  .png

当插入的每条数据都没有问题时,一切都很美好。关联、聚合、输出。

但当我们发现,某条已经被计算过的数据有问题时,麻烦大了。我们直接改最后的输出值其实是没有用的,这次改了,当再来数据触发计算时,结果还是会被错误的数据覆盖,因为中间计算结果没有被修改,它仍然是一个错误的值。怎么办?撤回流似乎能解决这个问题,这也确实是解决这个问题的手段,但是问题来了,撤回流怎么确定读取的数据是要被撤回的?另外,怎么去触发一次撤回?

CDC 解决了这些:将消息中间件的数据反序列化后,根据 Type 来识别数据是 Insert 还是 Delete;另外,如果大家看过 Flink 源码,会发现反序列化后的数据类型变了,从 Row 升级为 RowData,RowData 能够将数据标记为撤回还是插入,这就意味着每个算子能够判断出数据到底是需要下发还是撤回。

CDC 的重要性就先说这么多,之后有机会的话,出一篇实时 DQC 的视频,告诉大家 CDC 的出现,对于实时 DQC 的帮助有多大。下面让我们回到正题。

既然有那么多 CDC 同步中间件,那么一定会有各种各样的格式存放在消息中间件中,我们必然需要去解析它们。于是 Flink 1.11 提供了 canal-json 和 debezium-json,但我们用的是 Maxwell 怎么办?只能等官方出或者说是等有人向社区贡献吗?那如果我们用的是自研的同步中间件怎么办?

所以就有了今天的分享:如何去自定义实现一个 Maxwell format。大家也可以基于此文的思路去实现其他 CDC format,比如 OGG, 或是自研 CDC 工具产生的数据格式。

如何实现

当我们提交任务之后,Flink 会通过 SPI 机制将 classpath 下注册的所有工厂类加载进来,包括 DynamicTableFactory、DeserializationFormatFactory 等等。而对于 Format 来说,到底使用哪个 DeserializationFormatFactory,是根据 DDL 语句中的 Format 来决定的。通过将 Format 的值与工厂类的 factoryIdentifier() 方法的返回值进行匹配 来确定。

再通过 DeserializationFormatFactory 中的 createDecodingFormat(...) 方法,将反序列化对象提供给 DynamicTableSource。

通过图来了解整个过程(仅从反序列化数据并消费的角度来看):

6402.png

想要实现 CDC Format 去解析某种 CDC 工具产生的数据其实很简单,核心组件其实就三个:

  • 工厂类(DeserializationFormatFactory):负责编译时根据 ‘format’ = ‘maxwell-json’创建对应的反序列化器。即 MaxwellJsonFormatFactory。
  • 反序列化类(DeserializationSchema):负责运行时的解析,根据固定格式将 CDC 数据转换成 Flink 系统能认识的 INSERT/DELETE/UPDATE 消息,如 RowData。即 MaxwellJsonDeserializationSchema。
  • Service 注册文件:需要添加 Service 文件 META-INF/services/org.apache.flink.table.factories.Factory ,并在其中增加一行我们实现的 MaxwellJsonFormatFactory 类路径。

再通过代码,来看看反序列化中的细节:

public void deserialize(byte[] message, Collectorout) throws IOException {
       try {
           RowData row = jsonDeserializer.deserialize(message);
           String type = row.getString(2).toString(); // "type" field
           if (OP_INSERT.equals(type)) {
               RowData insert = row.getRow(0, fieldCount);
               insert.setRowKind(RowKind.INSERT);
               out.collect(insert);
           } else if (OP_UPDATE.equals(type)) {
               GenericRowData after = (GenericRowData) row.getRow(0, fieldCount); // "data" field
               GenericRowData before = (GenericRowData) row.getRow(1, fieldCount); // "old" field
               for (int f = 0; f < fieldCount; f++) {
                   if (before.isNullAt(f)) {
                       before.setField(f, after.getField(f));
                   }
               }
               before.setRowKind(RowKind.UPDATE_BEFORE);
               after.setRowKind(RowKind.UPDATE_AFTER);
               out.collect(before);
               out.collect(after);
           } else if (OP_DELETE.equals(type)) {
               RowData delete = row.getRow(0, fieldCount);
               delete.setRowKind(RowKind.DELETE);
               out.collect(delete);
           } else {
               if (!ignoreParseErrors) {
                   throw new IOException(format(
                       "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", type, new String(message)));
               }
           }
       } catch (Throwable t) {
           if (!ignoreParseErrors) {
               throw new IOException(format(
                   "Corrupt Maxwell JSON message '%s'.", new String(message)), t);
           }
       }
   }

其实并不复杂:先通过 jsonDeserializer 将字节数组根据 [data: ROW, old: ROW, type: String] 的 schema 反序列化成 RowData,然后根据 “type” 列的值来判断数据是什么类型:增、改、删;再根据数据类型取出 “data” 或者 “old” 区的数据,来组装成 Flink 认识的 INSERT/DELETE/UPDATE 数据并下发。

对象 jsonDeserializer 即 JSON 格式的反序列化器,它可以通过指定的 RowType 类型,读取 JSON 的字节数组中指定的字段并反序列化成 RowData。在我们的场景中,我们需要去读取如下 Maxwell 数据的 “data”, “old” 和 “type” 部分的数据。

{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}

因此 MaxwellJsonDeserializationSchema 中定义的 JSON 的 RowType 如下所示。

private RowType createJsonRowType(DataType databaseSchema) {
       // Maxwell JSON contains other information, e.g. "database", "ts"
       // but we don't need them
       return (RowType) DataTypes.ROW(
           DataTypes.FIELD("data", databaseSchema),
           DataTypes.FIELD("old", databaseSchema),
           DataTypes.FIELD("type", DataTypes.STRING())).getLogicalType();
   }

databaseSchema 是用户通过 DDL 定义的 schema 信息,也对应着数据库中表的 schema。结合上面的 JSON 和代码,我们能够得知 jsonDeserializer 只会取走 byte[] 中 data、old、type 这三个字段对应的值,其中 data 和old 还是个嵌套JSON,它们的 schema 信息和 databaseSchema 一致。由于 Maxwell 在同步数据时,“old”区不包含未被更新的字段,所以 jsonDeserializer 返回后,我们会通过 “data” 区的 RowData 将 old 区的缺失字段补齐。

得到 RowData 之后,会取出 type 字段,然后根据对应的值,会有三种分支:

  • insert:取出 data 中的值,也就是我们通过DDL定义的字段对应的值,再将其标记为 RowKind.INSERT 类型数据,最后下发。
  • update:分别取出 data 和 old 的值,然后循环 old 中每个字段,字段值如果为空说明是未修改的字段,那就用 data 中对应位置字段的值替代;之后将 old 标记为 RowKind.UPDATE_BEFORE 也就意味着 Flink 引擎需要将之前对应的值撤回,data 标记为 RowKind.UPDATE_AFTER 正常下发。
  • delete:取出 data 中的值,标记为 RowKind.DELETE,代表需要撤回。

处理的过程中,如果抛出异常,会根据 DDL 中maxwell-json.ignore-parse-errors的值来确定是忽视这条数据继续处理下一条数据,还是让任务报错。

笔者在 maxwell-json 反序列化功能的基础之上,还实现了序列化的功能,即能将 Flink 产生的 changelog 以 Maxwell 的 JSON 格式输出到外部系统中。其实现思路与反序列化器的思路正好相反,更多细节可以参考 Pull Request 中的实现。

PR 实现详情链接:
https://github.com/apache/flink/pull/13090

功能演示

给大家演示一下从 Kafka 中读取 Maxwell 推送来的 maxwell json 格式数据,并将聚合后的数据再次写入 Kafka 后,重新读出来验证数据是否正确。

Kafka 数据源表

CREATE TABLE topic_products (
 -- schema is totally the same to the MySQL "products" table
 id BIGINT,
 name STRING,
 description STRING,
 weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json');

Kafka 数据结果表&数据源表

CREATE TABLE topic_sink (
 name STRING,
 sum_weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell-sink',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
);

MySQL 表

-- 注意,这部分 SQL 在 MySQL 中执行,不是 Flink 中的表
CREATE TABLE product (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
description VARCHAR(512),
weight FLOAT
);
truncate product ;
ALTER TABLE product AUTO_INCREMENT = 101;
INSERT INTO product
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
      (default,"car battery","12V car battery",8.1),
      (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
      (default,"hammer","12oz carpenter's hammer",0.75),
      (default,"hammer","14oz carpenter's hammer",0.875),
      (default,"hammer","16oz carpenter's hammer",1.0),
      (default,"rocks","box of assorted rocks",5.3),
      (default,"jacket","water resistent black wind breaker",0.1),
      (default,"spare tire","24 inch spare tire",22.2);
UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
UPDATE product SET weight='5.1' WHERE id=107;
INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
UPDATE product SET weight='5.17' WHERE id=111;
DELETE FROM product WHERE id=111;
UPDATE product SET weight='5.17' WHERE id=102 or id = 101;
DELETE FROM product WHERE id=102 or id = 103;

先看看能不能正常读取 Kafka 中的 maxwell json 数据。

select * from topic_products;

6403.png

可以看到,所有字段值都变成了 Update 之后的值,同时,被 Delete 的数据也没有出现。

接着让我们再将聚合数据写入 Kafka。

insert into topic_sink select name,sum(weight) as sum_weight from topic_products group by name;

在 Flink 集群的 Web 页面也能够看到任务正确提交,接下来再让我们把聚合数据查出来。

select * from topic_sink

6404.png

最后,让我们查询一下 MySQL 中的表,来验证数据是否一致;因为在 Flink 中,我们将 weight 字段定义成 Decimal(10,2),所以我们在查询 MySQL 的时候,需要将 weight 字段进行类型转换。

6405.png

没有问题,我们的 maxwell json 解析很成功。

写在最后

根据笔者实现 maxwell-json format 的经验,Flink 对于接口的定义、对于模块职责的划分还是很清晰的,所以实现一个自定义 CDC format 非常简单(核心代码只有200多行)。因此,如果你是用的 OGG,或是自研的同步中间件,可以通过本文的思路快速实现一个 CDC format,一起解放你的 CDC 数据!

参考链接:

[1]https://github.com/alibaba/canal
[2]https://debezium.io/
[3]https://maxwells-daemon.io/

更多 Flink 技术交流可加 Flink 社区钉钉交流群~

最新钉群二维码.jpeg

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
602 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
2月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
194 1
京东零售基于Flink的推荐系统智能数据体系
|
6月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
7月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1681 45
|
6月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
107 1
|
SQL 消息中间件 canal
Flink cdc自定义format格式数据源
变更数据捕获 (CDC) 已成为一种流行的模式,本文介绍如何通过自定义format来获取不同格式cdc数据源
Flink cdc自定义format格式数据源
|
12月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
10月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3156 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
10月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
410 56

相关产品

  • 实时计算 Flink版