现在我的配置文件如下:
source:
type: mysql
hostname: "127.0.0.1" # 源库地址
port: "3306"
username: "root"
password: "123456"
database-name: "qfwk" # 源数据库名
tables: qfwk..* # 正则匹配所有表
server-id: 5400-5500 # 唯一 Server ID 范围
scan.startup.mode: initial # 全量+增量同步:cite[2]:cite[8]
sink:
type: jdbc
driver: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test # 目标库地址
username: root
password: 123456
table-name: ${table_name} # 动态表名(与源表一致)
primary-key: "" # 关键:必须显式声明主键字段(空字符串表示无主键)
sink.buffer-flush.max-rows: 1000 # 批量写入优化
sink.buffer-flush.interval: 2s # 刷新间隔:cite[6]:cite[8]
pipeline:
name: MySQL-to-MySQL-Sync
parallelism: 4 # 并行度,根据资源调整
execution.checkpointing.interval: 30s # 开启 Checkpoint 保障一致性:cite[2]:cite[8]
可是报Exception in thread "main" java.lang.RuntimeException: Cannot find factory with identifier "jdbc" in the classpath.
Available factory classes are:
org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory
at org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils.getFactoryByIdentifier(FactoryDiscoveryUtils.java:65)
at org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator.createDataSink(DataSinkTranslator.java:65)
at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.translate(FlinkPipelineComposer.java:132)
at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:99)
at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)
at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:74)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。