介绍
Flink CDC
CDC全称是Change Data Capture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获所有数据的变化,比如数据库捕获完整的变更日志记录增、删、改等,都可以称为CDC。该功能被广泛应用于数据同步、更新缓存、微服务间同步数据等场景,本文主要介绍基于Flink CDC在数据实时同步场景下的应用。
通过以上分析,基于Flink SQL CDC的数据同步有如下优点:
- 业务解耦:无需入侵业务,和业务完全解耦,也就是业务端无感知数据同步的存在。
- 性能消耗:业务数据库性能消耗小,数据同步延迟低。
- 同步易用:使用SQL方式执行CDC同步任务,极大的降低使用维护门槛。
- 数据完整:完整的数据库变更记录,不会丢失任何记录,Flink 自身支持 Exactly Once。
HUDI
Apache Hudi (发音为 “Hoodie”)在 DFS 的数据集上提供以下流原语:
- 插入更新 (如何改变数据集?)
- 增量拉取 (如何获取变更的数据?)
Hudi 维护在数据集上执行的所有操作的时间轴 (timeline),以提供数据集的即时视图。Hudi 将数据集组织到与 Hive 表非常相似的基本路径下的目录结构中。数据集分为多个分区,文件夹包含该分区的文件。每个分区均由相对于基本路径的分区路径唯一标识。
分区记录会被分配到多个文件。每个文件都有一个唯一的文件 ID 和生成该文件的提交 (commit)。如果有更新,则多个文件共享相同的文件 ID,但写入时的提交 (commit) 不同。
Hudi 解决了以下限制:
- HDFS 的可伸缩性限制;
- 需要在 Hadoop 中更快地呈现数据;
- 没有直接支持对现有数据的更新和删除;
- 快速的 ETL 和建模;
- 要检索所有更新的记录,无论这些更新是添加到最近日期分区的新记录还是对旧数据的更新,Hudi 都允许用户使用最后一个检查点时间戳。此过程不用执行扫描整个源表的查询。
Hudi的优势:
- HDFS 中的可伸缩性限制;
- Hadoop 中数据的快速呈现;
- 支持对于现有数据的更新和删除;
- 快速的 ETL 和建模。
Pulsar
Pulsar 是一个用于服务器到服务器的流原生消息系统,具有多租户、高性能等优势。Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
Pulsar 的关键特性如下:
- Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
- 极低的发布延迟和端到端延迟。
- 可无缝扩展到超过一百万个 topic。
- 简单的客户端 API,支持 Java、Go、Python 和 C++。
- 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
- 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
- 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
- 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
- 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。
此处使用pulsar的原因:
1: 支持永久存储
2: 异域复制,存储计算分离
3: 能直接集成presto
4:更灵活的订阅模式。
5:多租户
设计思想
MySQL 数据通过 Flink CDC 进入到 Pulsar。之所以数据先入 Pulsar 而不是直接入 Hudi,是为了实现多个实时任务复用 MySQL 过来的数据,避免多个任务通过 Flink CDC 接 MySQL 表以及 Binlog,对 MySQL 库的性能造成影响。
Kafka,HDFS数据同步到Pulsar.这一层作为实时数仓的ODS,用于记录最原始的数据,包含了所有业务的变更过程,以及比较细粒度的数据。此层包含了所有的原始业务数据,日志数据,规则数据等....
到pulsar的数据会同时按照实时数据仓库的链路,从 ODS->DWD->DWS->OLAP 数据库,最后供报表等数据服务使用。实时数仓的每一层结果数据会准实时的落一份到离线数仓,通过这种方式做到程序一次开发、指标口径统一,数据统一。而每一层的转化可以通过Flink/pulsar Functions或者集成presto来实现。
而存储在 Pulsar 的数据虽然可以永久存储,但是pulsar 针对upsert,delete,增量更新拉取等一些场景,仍旧不太完善。再者,如果把大量的历史数据再一次推到 pulsar,走实时计算的链路来修正历史数据,可能会影响当天的实时作业。所以针对重跑历史数据,会通过数据修正这一步来处理。而hudi本身依赖于hdfs,同步至hive非常容易。
总体上说,这个架构属于 Lambda 和 Kappa 混搭的架构。流批一体数据仓库的各个数据链路有数据质量校验的流程。第二天对前一天的数据进行对账,如果前一天实时计算的数据无异常,则不需要修正数据,Kappa 架构已经足够。
使用案例
-
Flink CDC 2.0 也已经正式发布,此次的核心改进和提升包括:
- 提供 MySQL CDC 2.0,核心 feature 包括
- 并发读取,全量数据的读取性能可以水平扩展;
- 全程无锁,不对线上业务产生锁的风险;
- 断点续传,支持全量阶段的 checkpoint。
- 版本搭配:
| flink |
hudi |
pulsar |
flink cdc |
| 1.12.2 | 0.9.0 | 2.7.2 |
1.4 |
| 1.13.1 | 0.10.0 | 2.8.1 |
2.0 |
准备前提:
Mysql 相关:
1:开启binlog
修改/etc/my.cnf文件
[myself@hadoop202 module]$ sudo vim /etc/my.cnf server-id= 1 log-bin=mysql-bin binlog_format=row binlog-do-db=gmall_flink_DW 注意:binlog-do-db根据自己的情况进行修改,指定具体要同步的数据库名字
2:授权
创建mysql用户
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
授予用户所需要的权限
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
注意:scan.incremental.snapshot.enabled启用后不再需要 RELOAD 权限(默认启用)。
最终确定的用户权限
FLUSH PRIVILEGES;
3: 为每个Reader 分配server ID
每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为服务器 id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此,如果不同的作业共享相同的服务器 id,可能会导致从错误的 binlog 位置读取。因此,建议通过SQL Hints为每个阅读器设置不同的服务器 id ,例如假设源并行度为 4,那么我们可以使用为 4 个源阅读器中的每一个分配唯一的服务器 id。
SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
maven 相关依赖:
<!--cdc 连接器--> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> <scope>provided</scope> </dependency> <!--pulsar 连接器--> <!-- https://mvnrepository.com/artifact/io.streamnative.connectors/pulsar-flink-connector --> <dependency> <groupId>io.streamnative.connectors</groupId> <artifactId>pulsar-flink-connector_2.11</artifactId> <version>1.13.1.2</version> <!-- <scope>provided</scope>--> </dependency> <!--hudi相关--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.Hudi</groupId> <artifactId>Hudi-Flink-bundle_2.11</artifactId> <version>${Hudi.version}</version> <scope>system</scope> <systemPath>${project.basedir}/libs/Hudi-Flink-bundle_2.11-0.10.0-SNAPSHOT.jar</systemPath> </dependency>
自定义cdc 反序列化器:
package csc; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.utils.TemporalConversions; import io.debezium.data.Envelope; import io.debezium.time.MicroTimestamp; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.TimestampData; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.time.LocalDateTime; import java.time.ZoneId; /** * @description: 自定义cdc反序列化器,解析binlog转换成Maxwell形式的json字符串 * @author: mdz * @date: 2021/8/17 **/ public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { ZoneId serverTimeZone; //反序列化方法 @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //获取库名和表名 String topic = sourceRecord.topic(); String[] split = topic.split("\\."); String database = split[1]; String table = split[2]; //获取操作类op,insert,update,delete String operation = Envelope.operationFor(sourceRecord).toString().toLowerCase(); //获取数据 Struct value = (Struct) sourceRecord.value(); Struct after = value.getStruct("after"); Struct before = value.getStruct("before"); //使用fastjson创建JSONObject存放数据 JSONObject beforeJson = new JSONObject(); JSONObject afterJson = new JSONObject(); //如果操作是删除的话,after为空,需要获取before,用于后续同步删除操作 outer: if (operation.equals("delete")) { Schema schema = before.schema(); for (Field field : schema.fields()) { //遍历schema,取出各个字段数据 beforeJson.put(field.name(),before.get(field.name())); } }else if (operation.equals("update")){ Schema afterSchema = after.schema(); for (Field field : afterSchema.fields()) { //遍历schema,取出各个字段数据 afterJson.put(field.name(),after.get(field.name())); } Schema beforeSchema = before.schema(); for (Field field : beforeSchema.fields()) { //遍历schema,取出各个字段数据 beforeJson.put(field.name(),before.get(field.name())); } } else{//如果是插入没有before数据 Schema afterSchema = after.schema(); for (Field field : afterSchema.fields()) { //遍历schema,取出各个字段数据 afterJson.put(field.name(),after.get(field.name())); } } //获取server_id String server_id = value.getStruct("source").get("server_id").toString(); //获取主键 JSONObject keyJson = null; if (sourceRecord.key() != null) { keyJson = new JSONObject(); Struct key = (Struct) sourceRecord.key(); Schema keySchema = key.schema(); for (Field field : keySchema.fields()) { //遍历schema,取出各个字段数据 if (operation.equals("delete")) { keyJson.put(field.name(),before.get(field.name())); }else{ keyJson.put(field.name(),after.get(field.name())); } } } //获取时间戳 String ts_ms = value.get("ts_ms").toString(); //创建JSON用于存放最终结果 JSONObject result = new JSONObject(); result.put("database", database); result.put("table", table); result.put("type", operation); result.put("before", beforeJson); result.put("after", afterJson); result.put("server_id", server_id); result.put("ts_ms", ts_ms); result.put("key", keyJson); collector.collect(result.toJSONString()); } //定义类型方法 @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } // 解决cdc 相差八小时时区问题 private TimestampData convertToTimestamp(Object dbzObj, Schema schema) { if (dbzObj instanceof Long) { switch (schema.name()) { case Timestamp.SCHEMA_NAME: return TimestampData.fromEpochMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)); } } LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); return TimestampData.fromLocalDateTime(localDateTime); } }
mysql 数据cdc 到pulsar:
import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic; import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper; import org.apache.flink.table.api.DataTypes; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import java.util.Optional; import java.util.Properties; /** * Created with IntelliJ IDEA. * * @Author: mdz * @Param: $ * @Date: 2021/10/16/14:08 * @Description: * @version: **/ public class MysqlCDC2Pulsar { public static void main(String[] args) throws Exception { //读取配置文件 ParameterTool parameters = null; ParameterTool parameters_tool = ParameterTool.fromArgs(args); //指定参数名:local_path String local_path = parameters_tool.get("configfile", null); if (StringUtils.isBlank(local_path)) { parameters = parameters_tool; } else { parameters = ParameterTool.fromPropertiesFile(local_path); } Properties debeziumProperties = new Properties(); debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock // debeziumProperties.put("scan.incremental.snapshot.enabled", "true"); // debeziumProperties.put("scan.incremental.snapshot.chunk.size", 8096); // debeziumProperties.put("scan.snapshot.fetch.size", 1024); /** * datastream API MySqlSource 只支持单并发,连接就一个,binaryLog Client也是一个。 */ DebeziumSourceFunction<String> mySqlSource = MySqlSource.<String>builder() .hostname("hostName") .port(port) .databaseList("yourDatabase") // set captured database .tableList("yourDatabase.table1,yourDatabase.table2...") // set captured table .username("userName") .password("password") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .debeziumProperties(debeziumProperties) .startupOptions(StartupOptions.initial()) .serverId(100035) .serverTimeZone("UTC") .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 设置任务关闭时候保留最后一次checkpoint 的数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 指定ck 的自动重启策略 env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck")); // 设置hdfs 的访问用户名 System.setProperty("HADOOP_USER_NAME","hdfs"); DataStreamSource<String> BinlogDS = env.addSource(mySqlSource); // 定义pulsar sink的一些参数 String topic = parameters.get("pulsardcTopic"); String adminUrl = parameters.get("pulsarAdminUrl"); Properties props = new Properties(); props.setProperty(PulsarOptions.TOPIC_SINGLE_OPTION_KEY, topic); props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000"); props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "false"); props.setProperty(PulsarOptions.ENABLE_KEY_HASH_RANGE_KEY, "true"); props.setProperty("pulsar.producer.blockIfQueueFull", "true"); props.setProperty("partition.discovery.interval-millis", "5000"); // 分区发现,定期检查topic分区情况 props.setProperty("pulsar.reader.receiverQueueSize", "100000"); props.setProperty("format", "debezium-json"); //pulsar的连接及认证 ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl(parameters.get("pulsarServiceUrl")); conf.setAuthPluginClassName("org.apache.pulsar.client.impl.auth.AuthenticationToken"); conf.setAuthParams(parameters.get("pulsarToken")); // 创建pulsar sink FlinkPulsarSink<String> pulsarSink = new FlinkPulsarSink<String>( adminUrl, Optional.of(topic), conf, props, new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema()).useAtomicMode(DataTypes.STRING()).build() , PulsarSinkSemantic.EXACTLY_ONCE ); String jobName = parameters.get("jobName"); //sink 到pulsar BinlogDS.addSink(pulsarSink).name("ORGA_BI_ORGBASEINFO job"); env.execute(jobName); env.execute("Print MySQL Snapshot + Binlog"); } }
pulsar2Hudi
....
常见错误及总结
snapshot.mode的各种参数,以下是测试效果 properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector properties.setProperty("snapshot.mode", "initial");每次重启都会读全量 properties.setProperty("snapshot.mode", "initial_only");//读不到数据 properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似 properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到 properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot
MySQL CDC源等待超时
在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 214748364
Flink Connector mysql CDC sql-client模式下全库扫描,直到capturing到目标表,如果实例下库表比较多,会影响任务启动时效
可以通过缩小数据库权限避免扫描全库 -- 授予REPLICATION Slave权限 GRANT RELOAD,REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'username'; -- 授予目标库的访问权限 GRANT SELECT ON `your_single_database`.* TO 'username'@'%'; -- 刷新权限 FLUSH PRIVILEGES;
扫描全表阶段慢,在 Web UI 出现如下现象:
- 原因:扫描全表阶段慢不一定是 cdc source 的问题,可能是下游节点处理太慢反压了。
- 解决方法:通过 Web UI 的反压工具排查发现,瓶颈主要在聚合节点上。通过在 sql-client-defaults.yaml 文件配上 MiniBatch 相关参数和开启 distinct 优化(我们的聚合中有 count distinct),作业的 scan 效率得到了很大的提升,从原先的 10 小时,提升到了 1 小时。关于性能调优的参数可以参阅:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/tuning/streaming_aggregation_optimization.html。
configuration: table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 2s table.exec.mini-batch.size: 5000 table.optimizer.distinct-agg.split.enabled: true
pulsar:
Caused by: org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Message size is bigger than 5242880 bytes
在bookkeeper.conf中配置: nettyMaxFrameSizeBytes=10M Netty 传输过程中,单条消息的最大size。 默认5242880 broker.conf中配置 maxMessageSize=10M 单条消息的最大size。默认5242880
pulsar Producer send queue is full
程序中配置: pulsar.producer.blockIfQueueFull=true
Hudi
ClassNotFoundException
flink lib包下添加: ./lib/hadoop-mapreduce-client-core-2.7.3.jar hive auxlib下添加: Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar


