问题一:一样的代码,把Flink cdc从2.2升级到2.4.2同步不到pgsql的数据是什么原因?
一样的代码,把Flink cdc从2.2升级到2.4.2同步不到pgsql的数据是什么原因?也没有任何异常
参考答案:
idea可以把日志配置文件配置好,可能有报错没打印出来
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/587170
问题二:cdc 在flink SQL 的情况下,某个字段的旧数据 是1 变成了新数据 2 ?
cdc 在flink SQL 的情况下,某个字段的旧数据 是1 变成了新数据 2 ,我就要把这个数据写出在jdbc中,这种场景SQL能完成吗?
参考答案:
是的,Flink SQL 可以获取到旧数据。在 Flink CDC 中,你可以使用 BEFORE
和 AFTER
关键字来定义源表和目标表之间的变化。当某个字段的值发生变化时,Flink CDC 会将变化的数据写入目标表中。
以下是一个示例:
CREATE TABLE source_table ( id INT, field1 INT, field2 STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', ... ); CREATE TABLE target_table ( id INT, field1 INT, field2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydb', 'table-name' = 'target_table', ... ); INSERT INTO target_table SELECT id, field1, field2 FROM source_table WHERE field1 = 1;
在这个示例中,我们首先创建了一个名为 source_table
的源表和一个名为 target_table
的目标表。然后,我们使用 INSERT INTO
语句将 source_table
中 field1
值为 1 的数据写入 target_table
。这样,当 field1
的值从 1 变为 2 时,Flink CDC 会自动将变化的数据写入目标表中。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/587169
问题三:flinkcdc 能读到数据,写不到mysql里 ,有遇到过吗?
flinkcdc mongo to mysql JdbcSink 能读到数据,写不到mysql里 ,有大佬遇到过吗?用 RichSinkFunction 自定义的sink就可以写入mysql。sql 也能成功,就jdbcsink不行,是不支持吗?
参考答案:
尝试下强制测试一条就刷出去或者1秒一条就刷出去
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/587215
问题四:Flink CDC现在哪个版本比较稳 定?我用的还是2.3的,新搭是换2.4好呢,还是3.0?
Flink CDC现在哪个版本比较稳 定?我用的还是2.3的,新搭是换2.4好呢,还是3.0?
参考答案:
Flink CDC 在不断发展和改进中,每个版本都有其独特的特性和优势。根据资料,Flink CDC 2.1 版本在稳定性和功能上都有所提升。而最新的稳定版本是3.0,这个版本对Flink CDC具有里程碑式的意义,它从捕获数据变更的Flink数据源正式迈向为以Flink为基础的端到端流式ELT数据集成框架。
考虑到您目前使用的是2.3版本,升级到新版本可能会带来更好的性能和稳定性。但是,具体选择哪个版本还需要根据您的实际需求和应用场景来决定。如果您的项目没有特定的兼容性需求,建议您升级到最新的稳定版本,以便获得Flink CDC的最新功能和改进。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/587214
问题五:为什么Flink CDCupdate会先删除后插入 -u +u?
为什么Flink CDCupdate会先删除后插入 -u +u?一个upsert应该就行了
参考答案:
你可以在flink中打开微批处理,就会拿一个批次中的-u +u合并为一条下发。直接过滤掉也可以,没有影响,主要看你下游使用
Configuration configuration = this.tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.size", "200000000");
configuration.setString("table.exec.mini-batch.allow-latency", "500ms");
打开这个参数,也只能保证一个批次的数据会合并,所以还是自己过滤好点
关于本问题的更多回答可点击进行查看: