Flink CDC里版本flink-sql-connector-postgres-cdc-2.4.0.jar的postgresql-cdc使用增量快照就会报错slot already exits,改变成全量就好了,但是全量会对生产有些影响,该怎么解决?postgresql如果全量快照的话,不会阻止其他表数据写入,是不是不会对生产造成影响。
Flink CDC 3.0中,PostgreSQL的增量快照功能确实存在一些问题,导致在增量快照时出现"slot already exists"的错误。为了解决这个问题,你可以尝试以下方法:
升级到Flink CDC 3.1或更高版本,因为新版本可能已经修复了这个问题。你可以访问Flink CDC官方网站下载最新版本:https://flink-cdc.apache.org/downloads.html
如果升级后问题仍然存在,你可以考虑使用全量快照替代增量快照。虽然全量快照可能会对生产环境造成一定影响,但它可以确保数据的一致性。你可以在创建表时设置enable.incremental.snapshots=false
来禁用增量快照功能。例如:
CREATE TABLE my_table (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgresql-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'myuser',
'password' = 'mypassword',
'database-name' = 'mydb',
'table-name' = 'my_table',
'scan.startup-timestamp-millis' = '1627552800000', -- 设置扫描开始时间戳
'scan.stop-timestamp-millis' = '9999999999999', -- 设置扫描结束时间戳
'scan.interval' = '1000', -- 设置扫描间隔
'scan.max-rate' = '100', -- 设置扫描速率
'scan.startup-timeout-sec' = '60', -- 设置扫描启动超时时间
'scan.stop-timeout-sec' = '60', -- 设置扫描停止超时时间
'scan.parallelism' = '1', -- 设置并行度
'scan.batch-size' = '1000', -- 设置批处理大小
'scan.max-num-records' = '10000', -- 设置最大记录数
'scan.max-num-files' = '100', -- 设置最大文件数
'scan.filter-file-extension' = 'csv', -- 设置过滤文件扩展名
'scan.filter-path' = '/path/to/filter', -- 设置过滤路径
'scan.filter-type' = 'regex', -- 设置过滤类型
'scan.filter-pattern' = '.*\\.csv$', -- 设置过滤模式
'scan.filter-fail-on-missing' = 'true', -- 设置过滤失败时是否抛出异常
'scan.filter-missing-file-action' = 'ignore', -- 设置过滤缺失文件时的操作
'scan.filter-missing-file-prefix' = 'missing_', -- 设置过滤缺失文件的前缀
'scan.filter-missing-file-suffix' = '.csv', -- 设置过滤缺失文件的后缀
'scan.filter-missing-file-count' = '10', -- 设置过滤缺失文件的数量
'scan.filter-missing-file-sleep-ms' = '1000', -- 设置过滤缺失文件的休眠时间(毫秒)
'enable.incremental.snapshots' = 'false' -- 禁用增量快照功能
);
这样,你的表将使用全量快照进行数据同步,但请注意这可能会对生产环境造成一定影响。在实际应用中,你需要根据业务需求和资源限制来权衡增量快照和全量快照的使用。
slot中name 是唯一的。mysql-cdc现在默认是增量快照来读取历史数据的, 如果你用pg这边也是增量快照读取历史数据也是无锁,建议测试用3.0。此回答来自钉群Flink CDC 社区。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。