Flink CDC 中还少了啥参数吗?启动命令flink run-application --target yarn-application -m yarn-cluster -Dyarn.provided.lib.dirs=hdfs://master1:8020/flink/runtime/1.14.6-2.11/lib -Dyarn.application.name='CDCWriteDoris' -Dtaskmanager.numberOfTaskSlots=1 -Djobmanager.memory.process.size=1G -Dtaskmanager.memory.process.size=1G -Dyarn.application.queue=default -Dparallelism.default=1 -Dtaskmanager.memory.managed.size=0 -c org.sunway.inceptor.doris.CDCWriteDoris /home/hdfs/flink-inceptor-1.0-SNAPSHOT.jar -s hdfs:///flink/flink-checkpoints/00000000000000000000000000000000/chk-46
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在您提供的Flink CDC启动命令中,已经包含了一些常见的配置参数,例如YARN集群相关的参数、任务并行度、内存管理等。然而,根据知识库中的参考资料,还有一些可能需要补充的参数或优化建议,具体如下:
Flink CDC连接器需要一些特定的参数来确保数据同步的正确性和性能。以下是一些可能缺失的关键参数:
scan.startup.mode
指定Flink CDC的启动模式,例如initial(全量+增量)、latest-offset(仅增量)等。如果未指定,默认为initial。
scan.startup.timestamp-millis
如果使用timestamp模式启动,需要指定时间戳(单位为毫秒)。此参数与scan.startup.mode=timestamp配合使用。
server-time-zone
数据库会话时区,用于正确解析MySQL中的TIMESTAMP类型。如果未指定,默认使用Flink作业运行时的环境时区。
debezium.* 参数优化
Debezium是Flink CDC底层的核心组件,可以通过以下参数优化Binlog解析性能:
debezium.max.queue.size: 增加阻塞队列容量以提高吞吐量,默认值为8192。debezium.max.batch.size: 每次处理的事件条数,默认值为2048。debezium.poll.interval.ms: 请求新变更事件的间隔时间,默认值为1000毫秒。如果您使用的是Postgres CDC连接器,可以考虑以下参数: - heartbeat.interval.ms
发送心跳包的时间间隔,用于推进Slot偏移量。默认值为30秒(30000毫秒)。
scan.incremental.snapshot.chunk.key-column
指定某一列作为快照阶段切分分片的切分列。默认从主键中选择第一列。
scan.incremental.close-idle-reader.enabled
是否在快照结束后关闭空闲的Reader。需要配合execution.checkpointing.checkpoints-after-tasks-finish.enabled=true使用。
Flink CDC依赖于Checkpoint机制来保证Exactly-Once语义。以下是一些可能需要补充的Checkpoint相关参数:
execution.checkpointing.interval
Checkpoint的时间间隔,推荐设置为10秒左右。
execution.checkpointing.min-pause
两次Checkpoint之间的最小暂停时间,避免频繁触发Checkpoint导致性能下降。
state.backend 和 state.checkpoints.dir
指定状态后端和Checkpoint存储路径。例如,使用rocksdb作为状态后端,并将Checkpoint存储到HDFS上。
如果目标是写入Doris或其他存储系统,还需要配置Sink相关参数。以下是一些可能需要补充的参数:
sink.parallelism
设置Sink的并发度,以提高写入性能。
sink.file-cached.memory 和 sink.file-cached.memory.segment-size
如果启用了文件缓存模式(sink.file-cached.enable=true),需要配置写入文件的最大堆外内存大小和Buffer大小。
upsert.writer.buffer-size 和 upsert.writer.bucket.buffer-size
配置Upsert Writer的缓存大小,以优化写入效率。建议根据分区数量调整这些参数。
您已经在命令中设置了taskmanager.memory.process.size和jobmanager.memory.process.size,但还可以进一步优化资源分配:
taskmanager.memory.managed.size
您当前设置为0,建议根据实际需求调整托管内存大小,以支持更高效的State操作。
yarn.provided.lib.dirs
确保HDFS路径中包含了所有必要的依赖JAR包,例如flink-sql-connector-mysql-cdc-${version}.jar或flink-sql-connector-postgres-cdc-${version}.jar。
Schema Evolution 支持
如果需要支持Schema变更(如新增字段、修改字段类型等),请确保使用VVR 4.0.11及以上版本,并启用Schema Evolution功能。
整库同步
如果需要同步整个数据库,可以使用CREATE DATABASE AS语句,简化配置流程。
熵注入(Entropy Injection)
如果目标存储是OSS-HDFS,建议开启熵注入功能以提高写入效率。通过oss.entropy.key和oss.entropy.length配置随机字符串替换规则。
以下是补充了部分关键参数后的启动命令示例:
flink run-application \
--target yarn-application \
-m yarn-cluster \
-Dyarn.provided.lib.dirs=hdfs://master1:8020/flink/runtime/1.14.6-2.11/lib \
-Dyarn.application.name='CDCWriteDoris' \
-Dtaskmanager.numberOfTaskSlots=1 \
-Djobmanager.memory.process.size=1G \
-Dtaskmanager.memory.process.size=1G \
-Dyarn.application.queue=default \
-Dparallelism.default=1 \
-Dtaskmanager.memory.managed.size=256m \
-Dexecution.checkpointing.interval=10s \
-Dexecution.checkpointing.min-pause=5s \
-Dstate.backend=rocksdb \
-Dstate.checkpoints.dir=hdfs:///flink/checkpoints \
-Dsink.parallelism=2 \
-Dsink.file-cached.enable=true \
-Dsink.file-cached.memory=128m \
-Dsink.file-cached.memory.segment-size=256k \
-c org.sunway.inceptor.doris.CDCWriteDoris \
/home/hdfs/flink-inceptor-1.0-SNAPSHOT.jar \
-s hdfs:///flink/flink-checkpoints/00000000000000000000000000000000/chk-46
以上补充的参数涵盖了CDC Source、Checkpoint、Sink配置以及资源管理等方面。根据实际业务需求,您可以选择性地添加这些参数以优化Flink CDC作业的性能和稳定性。