群里大佬有谁用过flink-cdc连接pg数据库的,了解插槽的?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用 Flink CDC 连接 PostgreSQL 数据库时,插槽(Replication Slot)是一个非常重要的概念。以下是关于插槽的详细说明和相关配置指南:
Replication Slot 是 PostgreSQL 中用于逻辑复制的核心机制之一。它确保了数据库的 WAL(Write-Ahead Logging)日志不会被过早清理,从而保证 Flink CDC 能够可靠地读取变更数据。
作用:
重要性:
在使用 Flink CDC 连接 PostgreSQL 时,需要满足以下配置要求: 1. WAL 配置: - 设置 wal_level = logical
,以支持逻辑解码。 - 确保 max_wal_senders
和 max_replication_slots
的值足够大,能够容纳当前数据库的复制槽数量以及 Flink 作业所需的插槽数量。
表级设置:
REPLICA IDENTITY
设置为 FULL
,以便在发生更新或删除操作时,能够捕获完整的旧值信息。权限要求:
SUPERUSER
权限,或者同时拥有 LOGIN
和 REPLICATION
权限。SELECT
权限。在 Flink SQL 中,可以通过以下参数配置 Replication Slot:
CREATE TEMPORARY TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
order_time TIMESTAMP,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'polardbo-cdc',
'hostname' = '<yourHostname>',
'port' = '<yourPort>',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = 'flink_source',
'schema-name' = 'public',
'table-name' = 'shipments',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'flink'
);
'slot.name'
:指定 Replication Slot 的名称,例如 flink
。'decoding.plugin.name'
:推荐使用 pgoutput
插件进行逻辑解码。如果出现错误提示 Replication slot "xxxx" is active
,说明插槽已被其他作业占用。
SELECT pg_drop_replication_slot('rep_slot');
'debezium.slot.drop.on.stop' = 'true'
如果插槽未被正确消费,可能会导致 WAL 文件堆积,占用大量磁盘空间。
SELECT * FROM pg_replication_slots;
如果 max_replication_slots
参数值不足,可能会导致无法创建新的插槽。
postgresql.conf
,增加以下参数:max_replication_slots = 10
max_wal_senders = 10
插槽命名规范:
flink_<job_name>
,以便于管理和排查问题。自动清理机制:
'debezium.slot.drop.on.stop' = 'true'
参数,确保作业停止时自动清理插槽,避免资源浪费。监控与告警:
通过以上配置和优化,您可以高效地使用 Flink CDC 连接 PostgreSQL 数据库,并充分利用 Replication Slot 的功能来实现可靠的数据同步。