一文入门Dataphin实时集成

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: Dataphin实时集成的读取和写入原理是什么?Dataphin实时集成和实时研发的区别是什么?Dataphin实时集成有哪些优势?本文一次讲清


实时集成采集-Flink CDC

概念

Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源库,通过与数据库交互并读取其变更日志(例如MySQL的binlog或Oracle的Redo Log)来获取数据变化事件,然后将这些事件转换成Flink内部可处理的Changelog流格式,后续利用Flink强大的状态管理和流处理能力,对变更数据进行实时分析、聚合或者传输至目标存储系统如Hadoop HDFS、Kafka或其他数据库等。

数据库变更日志和Changelog流

数据库的变更日志(Change Log)和Changelog流都是用于记录数据库状态变化的数据结构

  1. 数据库变更日志:

数据库变更日志是数据库系统内用来记录所有对数据进行修改操作的日志文件或机制,是数据库本身维护的用于保证数据完整性和提供恢复能力的底层技术实现。例如:

  • MySQL的二进制日志(Binary Log):它记录了所有的DDL(数据定义语言)和DML(数据操作语言)操作,如INSERT、UPDATE、DELETE等,这些操作按照事务提交的顺序排列。
  • Oracle的Redo Log:用于保存事务对数据库所做的更改信息,以支持事务的回滚和数据库的恢复。
  • PostgreSQL的WAL(Write-Ahead Log):在事务提交前先将变更写入日志,确保在系统崩溃时能够恢复到一致的状态。
  • Changelog流:

Apache Flink中的Changelog流是一种特殊的事件流,是从变更日志中提取出来,适合于流处理框架使用的、描述数据变更过程的数据流形式。每个事件包含了关于某个数据对象(如数据库表的一行记录)状态变更的具体信息,如添加、更新或删除的动作类型,以及变更后的数据内容。

  1. 举个例子:

假设我们有一个电商数据库,其中包含一个orders表,记录了所有的订单信息。现在发生了以下操作:

  1. 数据库变更日志:
  • 插入操作:当用户A下单后,数据库会执行一条INSERT语句,向orders表中插入一行新数据(例如:订单ID=1001,用户ID=A,商品ID=1,数量=2)。
  • 更新操作:假设用户A修改了订单1001的商品数量,从2个变为3个。数据库会执行一条UPDATE语句,更新orders表中的相应记录。
  • INSERT操作和UPDATE操作会被数据库的变更日志记录下来,如MySQL的二进制日志可能记录为:
INSERT INTO orders (order_id, user_id, product_id, quantity) VALUES (1001, 'A', 1, 2);
UPDATE orders SET quantity = 3 WHERE order_id = 1001;
  • 如果之后用户A取消订单并执行了一条DELETE语句,该操作也会被记录到变更日志中。
  • Changelog流:
  • 当使用Apache Flink CDC等工具捕获上述数据库变更日志时,会将其转换为Changelog流的形式:在这里,Changelog流以事件形式表示了数据库中的变化,每个事件包含了操作类型(INSERT、UPDATE、DELETE)以及对应的数据内容。
+ INSERT: Order(order_id=1001, user_id='A', product_id=1, quantity=2)
+ UPDATE_BEFORE: Order(order_id=1001, user_id='A', product_id=1, quantity=2)
+ UPDATE_AFTER: Order(order_id=1001, user_id='A', product_id=1, quantity=3)
+ DELETE: Order(order_id=1001, user_id='A', product_id=1, quantity=3)

Flink增量读取和全量读取

增量读取:单并发,关系型数据库是单机,多并发无意义

全量读取:多并发,否则同步速率慢,读不过来

Flink实现全量读取数据的方式通常不依赖于数据库的Redo Log或Binlog日志,而是直接从数据源(如数据库、文件系统或其他数据存储)中一次性加载所有符合查询条件的数据,并将其转换为DataStream或Table,然后进一步进行流式或批量处理。

目前实时集成仅支持增量读取,全量读取可通过离线集成实现

实时集成写入

对采集到的Changelog流进行处理,通过对应的connector进行写入

JDBC写入关系型数据库:Changelog流根据关系型数据库的特性,如字段类型、约束条件等,转换处理为合理的数据

写入大数据存储、消息队列:Changelog流转换为Append流写入

通过jdbc connector写入Mysql、Oracle 通过对应的connector写入odps、Hive 通过 kafka connector写入Kafka

结果表没有主键

结果表有主键

Changelog流消息类型

insert

处理为insert

处理为upsert

处理为insert,标记操作类型insert

处理为insert,标记操作类型insert

update_before

直接丢弃

直接丢弃

直接丢弃

直接丢弃

update_after

处理为insert

处理为upsert

处理为insert,标记操作类型update

处理为insert,标记操作类型update

delete

直接丢弃

处理为delete

处理为insert,标记操作类型delete

处理为insert,标记操作类型delete

目标表

需要映射目标表

写入同一个topic中,数据会带有表名字段

DDL处理

正常处理/忽略/报错

全部写入,无法忽略/报错

用户可以自行处理得到的结果表,按主键聚合,按更新时间进行排序,根据最新的操作类型执行


实时集成与实时研发对比

实时研发

实时集成

配置方法

写flink sql/flink datastream代码

配置化操作,无需手写代码

实时采集支持的数据源

多种,实现方式包含Flink CDC(MySQL)、定时查表(SAP)、直接读取(Kafka)等

目前仅支持Flink CDC采集

同步表量

一个任务同步的表量由代码决定,每张表需要单独的ddl定义(开源的connector仅支持单表处理,flink datastream支持,代码编写难度大)

一个任务可支持整库同步,可圈选或排除某些表

(实时集成是在开源的connector基础上进行改造)

是否有改造引擎?

数据处理

数据处理能力较灵活,可以自行编写数据处理和计算代码

数据处理能力较弱

场景

同步表数量较少,需要进行数据处理

同步表数量较多且不需要进行数据处理

实时集成工具对比

实时数据采集工具

厂家

特点

OGG

Oracle原厂的Oracle采集和同步工具

价格昂贵

性能高

只支持Oracle实时数据同步

Dataphin实时集成

Dataphin自研的 CDC Connector

配置化方式,无需写代码

支持整库或多表同步

支持多种数据源

性能优于开源Flink CDC

开源Flink CDC(可搭配Dataphin实时研发)

开源免费的Flink CDC

需要写Flink SQL代码

只支持单表数据同步

总结-性能/价格:OGG > Dataphin实时集成 >> 开源Flink CDC





相关文章
|
2月前
|
安全 Java 数据库连接
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
315 6
|
2月前
|
测试技术 数据库 开发者
Django自动化测试入门:单元测试与集成测试
【4月更文挑战第15天】本文介绍了Django的自动化测试,包括单元测试和集成测试。单元测试专注于单个视图、模型等组件的正确性,而集成测试则测试组件间的交互。Django测试框架提供`TestCase`和`Client`进行单元和集成测试。通过编写测试,开发者能确保代码质量、稳定性和应用的正确协同工作。运行测试使用`python manage.py test`命令,建议将其纳入日常开发流程。
|
2月前
|
监控 Oracle 关系型数据库
Dataphin实时集成Oracle CDC相关问题排查
本文档提供了Dataphin平台Oracle CDC实时集成相关问题排查指南,覆盖了权限等常见问题,旨在帮助快速定位和解决Oracle数据库变更数据捕获(CDC)集成过程中所可能遇到的技术难题,确保数据的实时、准确同步。
|
2月前
|
存储 数据处理
Dataphin集成任务支持自定义FTP标记完成文件内容(V3.14)
在文件传输的场景中,标记完成文件(有时也被称为标档文件)作为一种重要的确认机制被广泛应用。这一机制通过创建特定的“传输完成标识文件”,用于明确指示数据文件已成功完成全量传输,并达到可以进行下一步业务处理的状态,从而有效防止了基于不完整数据流的错误操作。
|
2月前
|
监控 安全 大数据
Dataphin V3.10升级速览丨集成能力提升、15个应用场景、数据治理能力优化……
Dataphin V3.10升级速览丨集成能力提升、15个应用场景、数据治理能力优化……
102 0
|
7月前
|
SQL 关系型数据库 MySQL
数据集成框架FlinkX(纯钧)入门
数据集成框架FlinkX(纯钧)入门
248 0
|
4天前
|
Java 关系型数据库 MySQL
如何实现Springboot+camunda+mysql的集成
【7月更文挑战第2天】集成Spring Boot、Camunda和MySQL的简要步骤: 1. 初始化Spring Boot项目,添加Camunda和MySQL驱动依赖。 2. 配置`application.properties`,包括数据库URL、用户名和密码。 3. 设置Camunda引擎属性,指定数据源。 4. 引入流程定义文件(如`.bpmn`)。 5. 创建服务处理流程操作,创建控制器接收请求。 6. Camunda自动在数据库创建表结构。 7. 启动应用,测试流程启动,如通过服务和控制器开始流程实例。 示例代码包括服务类启动流程实例及控制器接口。实际集成需按业务需求调整。
|
17天前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
49 1
|
23天前
|
消息中间件 Java Kafka
springboot集成kafka
springboot集成kafka
36 2
|
11天前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成