Flink CDC中postgresql 想要获取到 更新前的数据,是需要开启什么配置,找了半天没找到,只有after的数据?
在 Flink CDC 中,要获取到 PostgreSQL 更新前的数据,你需要配置 Flink CDC 引用的 Debezium Postgres Connector。
通过 Debezium Postgres Connector,你可以为每个表的每个事件(包括插入、更新和删除)获取到完整的事件消息,包括更新之前和之后的数据。
为了配置获取更新前的数据,你需要在 Debezium Postgres Connector 配置中设置以下属性:
设置 snapshot.mode
为 initial
:这将确保在 CDC 启动时,所有的数据快照都会被抓取,包括更新前的数据。
设置 include.query
为 true
:这将捕获并发送所有 UPDATE 语句的更新前的数据。
下面是一个示例配置文件的片段,展示了如何设置获取更新前的数据:
"connector.properties": "
...
snapshot.mode=initial
include.query=true
..."
请注意,在配置时需要与 CDC 应用程序使用的 Flink 版本和 Connector 版本相匹配。配置项可能会因版本而异,因此在参考相关文档进行配置时请查看适用于你所使用版本的文档。
使用上述配置,在 Flink CDC 中,你将能够获取到 PostgreSQL 更新前的数据。
在Flink CDC中,要获取PostgreSQL的更新前的数据,需要开启update-mode
为upsert
的配置。具体配置如下:
{
"source": {
"type": "postgresql",
"endpoint": "localhost:5432",
"database": "testdb",
"username": "user",
"password": "password",
"table": "mytable",
"debezium.internal.offset.storage": "org.apache.flink.connector.debezium.offset.postgresql.PostgresOffsetStorage",
"debezium.sqlserver.include.schema.changes": "false",
"debezium.sqlserver.exclude.schema.changes": "false",
"debezium.sqlserver.skip.ddl": "false",
"debezium.sqlserver.snapshot.mode": "initial",
"debezium.sqlserver.snapshot.timeout": "60000",
"debezium.sqlserver.heartbeat.interval": "1000",
"debezium.sqlserver.heartbeat.timeout": "60000",
"debezium.sqlserver.max.queued.messages": "1000",
"debezium.sqlserver.parallelism": "1",
"update-mode": "upsert" // 开启更新前的数据获取
}
}
其中,update-mode
设置为upsert
后,Flink CDC会将源表中的每一行数据都视为一个事件,当发生更新操作时,会同时生成一个DELETE事件和一个INSERT事件。这样,在Flink任务中就可以通过处理这两个事件来获取到更新前的数据了。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。