flink-cdc抓取postgres有demo嘛,通过官方的文档发现跑不起来?
update数据后抓取不到
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用Flink CDC抓取PostgreSQL数据时,如果遇到update
数据后无法抓取的问题,可能是由于配置或环境设置不正确导致的。以下是详细的解决方案和示例代码,帮助您排查问题并成功运行Flink CDC抓取PostgreSQL的Demo。
Flink CDC依赖PostgreSQL的逻辑复制功能来捕获变更数据(如INSERT
、UPDATE
、DELETE
)。以下配置是必须的:
wal_level
参数wal_level
参数设置为logical
。
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();
SHOW wal_level;
REPLICA IDENTITY
REPLICA IDENTITY
设置为FULL
,以支持UPDATE
和DELETE
操作的完整旧值记录。
ALTER TABLE your_table REPLICA IDENTITY FULL;
max_wal_senders
和max_replication_slots
max_wal_senders
和max_replication_slots
的值足够大,以支持Flink作业所需的Slot数量。
SHOW max_wal_senders;
SHOW max_replication_slots;
SUPERUSER
权限,或者同时拥有LOGIN
和REPLICATION
权限,并对目标表具有SELECT
权限。以下是一个完整的Flink SQL示例,展示如何使用Flink CDC抓取PostgreSQL的数据:
CREATE TABLE postgrescdc_source (
id INT,
name STRING,
description STRING,
weight DECIMAL(10, 3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>',
'decoding.plugin.name' = 'pgoutput', -- 推荐使用pgoutput插件
'slot.name' = 'flink_slot', -- 自定义逻辑解码槽名称
'scan.startup.mode' = 'initial' -- 初始模式:先读全量数据,再读增量数据
);
SELECT * FROM postgrescdc_source;
UPDATE
数据抓取不到REPLICA IDENTITY
未设置为FULL
,导致UPDATE
操作的旧值无法被逻辑解码插件捕获。
REPLICA IDENTITY
设置为FULL
:ALTER TABLE your_table REPLICA IDENTITY FULL;
execution.checkpointing.interval: 60000 # 每60秒触发一次Checkpoint
execution.checkpointing.tolerable-failed-checkpoints: 3 # 容忍失败的Checkpoint次数
SELECT * FROM pg_replication_slots;
SELECT pg_drop_replication_slot('slot_name');
pgoutput
插件,它是PostgreSQL 10及以上版本的官方内置插件,性能更优且无需额外安装。'debezium.event.deserialization.failure.handling.mode' = 'warn'
准备PostgreSQL环境:
wal_level
、REPLICA IDENTITY
、max_wal_senders
等参数。创建Flink SQL作业:
启动Flink作业:
INSERT
、UPDATE
和DELETE
操作。通过以上步骤,您可以成功运行Flink CDC抓取PostgreSQL数据的Demo,并解决UPDATE
数据抓取不到的问题。如果仍有疑问,请检查PostgreSQL的日志和Flink作业的运行日志,定位具体错误原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。