问题一:Flink CDC编译build都正常,在idea中执行的时候提示这个错误?
Flink CDC编译build都正常,在idea中执行的时候提示这个错误?
参考回答:
缺少jar包,flink-java-api
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/584427
问题二:Flink CDC有人遇到过这个问题吗?
Flink CDC有人遇到过这个问题吗?Caused by: org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to offer finished split information, this should not be called
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:355)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139)
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.sendBinlogMeta(MySqlSourceEnumerator.java:251)
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:145)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:167)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:329)
... 8 more
参考回答:
看着像是主键分割有异常
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/584426
问题三:Flink CDC有没有遇到Oracle表没有获取日志,也没报错?cdc 2.2 oracle19c
Flink CDC有没有遇到Oracle表没有获取日志,也没报错?cdc 2.2 oracle19c
参考回答:
首先确认19的部署模式是不是有pdb库,归档日志是否正常开启
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/584420
问题四:flink-cdc2.4.0监听Oracle数据库,监听表发生表结构变更报错挂掉,有解决方案吗?
flink-cdc2.4.0监听Oracle数据库,监听表发生表结构变更报错挂掉, mysql在这一块倒是支持表结构变更不受影响继续监听,oracle是还没支持还是有解决方法?用的是java集成cdc,构造数据源是使用DataStream Source。现在3.0.0会支持吗?
参考回答:
Flink CDC在处理Oracle数据库的变更数据捕获时,对于表结构的变更可能没有像MySQL那样提供无缝的支持。当Oracle数据库中的表结构发生变更时,可能会导致Flink CDC任务失败或者挂起。
目前,Oracle的CDC支持在Flink中主要是通过Debezium Oracle Connector实现的。Debezium在处理Oracle的表结构变更时,可能会遇到一些挑战,因为Oracle的DDL语句和MySQL有所不同,且Oracle的元数据访问也更为复杂。
以下是一些可能的解决方法或注意事项:
- 手动干预:
- 在进行表结构变更之前,可以先暂停Flink CDC任务,完成变更后再恢复任务。
- 或者在变更后,重新配置和启动Flink CDC任务以适应新的表结构。
- 使用Debezium的事件筛选功能:
- 可能可以通过配置Debezium Oracle Connector来忽略或者特殊处理DDL事件,但这需要对Debezium的配置和工作原理有深入理解。
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/584177
问题五:Flink1.18.0版本连接外部kafka本地执行测试代码报错
版本:Flink1.18.0
代码如下:```
public static void main(String[] args) {
//1、创建TableEnvironment EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings); //2、创建source table Schema schema = Schema.newBuilder() .column("user", DataTypes.STRING()) .column("url",DataTypes.STRING()) .column("cTime", DataTypes.STRING()) .build(); tEnv.createTemporaryTable( "sourceTable", TableDescriptor.forConnector("kafka") .schema(schema) .option("topic","spring_test_kafka_topic") .option("properties.bootstrap.servers","xxxxx:9092") .option("properties.group.id","clicklog") .option("scan.startup.mode","earliest-offset") .option("format","json") .build() ); //3、创建sink table tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print").schema(schema).build()); //4、执行sql查询 //Table resultTable = tEnv.sqlQuery(" select ,userurl,cTime from sourceTable "); Table resultTable = tEnv.from("sourceTable") .select($("user"), $("url"),$("cTime")); //5、输出 resultTable.executeInsert("sinkTable"); }
```
操作流程:运行代码报错:
请问是什么原因呢,请大佬指点
参考回答:
是引入的依赖包造成的,刚开始参考官方文档引入kafka依赖时,项目的pom文件引入的是flink-connector-kafka,而不是flink-sql-connector-kafka,修改pom文件如下即可,同时要将对应的jar包上传到flink集群各节点的lib包下并重启集群
此外,kafka上的主题名称不要使用下划线,否则会报类似如下的错误:
关于本问题的更多回答可点击原文查看: