问题一:Flink cdc3.0.1,oracle11,在源库首次插入数据的时候报错怎么办?
Flink cdc3.0.1,oracle11,在源库首次插入数据的时候报错怎么办?
参考回答:
debug看源码,改源码编译最好。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/609838
问题二:Flink CDC里这个报错一般是什么原因?
Flink CDC里这个报错一般是什么原因?
参考回答:
看一下权限。这显示是读取不到log文件。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/609835
问题三:Flink这个问题怎么解决?
Flink这个问题怎么解决?
参考回答:
看报错是nio包的,channel应该有属性可以设置的,建议看看原生的nio包API写的demo,找找灵感
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/609767
问题四:使用flinkcdc同步mysql至mysql的数据,只会同步一次,修改源表后目标表没有变化
使用flinkcdc同步mysql至mysql的数据,只会同步一次,修改源表后目标表没有变化
参考回答:
这个问题可能是由于Flink CDC的快照模式导致的。在Flink CDC中,有两照模式:initial_and_latest
和only_snapshot
。默认情况下,Flink CDC会使用initial_and_latest
模式,这意味着它会在启动时获取源表的初始快照,并在后续时刻获取最新的快照。
要解决这个问题,你可以尝试将Flink CDC的快照模式更改为only_snapshot
。这样,Flink CDC只会在启动时获取源表的初始快照,而不会在后续时刻获取最新的快照。你可以通过以下代码设置快照模式:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; impor org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.mysql.MySqlCatalog; import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.table.descriptors.FormatDescriptor; import org.apache.flink.table.descriptors.SchemaDescriptor; import org.apache.flink.table.descriptors.SourceDescriptor; import org.apache.flink.table.descriptors.TableDescriptor; import org.apache.flink.table.descriptors.WatermarkDescriptor; import org.apache.flink.table.descriptors.XyzDescriptor; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.table.utils.TableTestBase; import org.apache.flink.types.Row; public class FlinkCDCSync { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建源表描述符 SourceDescriptor sourceDescriptor = new SourceDescriptor("mysql", "cdc", "source"); sourceDescriptor.property("hostname", "localhost"); sourceDescriptor.property("port", "3306"); sourceDescriptor.property("username", "root"); sourceDescriptor.property("password", "123456"); sourceDescriptor.property("database-name", "test"); sourceDescriptor.property("table-name", "source_table"); sourceDescriptor.property("scan.startup.mode", "initial_and_latest"); // 修改为 only_snapshot // 注册源表 tableEnv.connect(sourceDescriptor).withSchema().inAppendMode().registerTableSource("source_table"); // 创建目标表描述符 TableDescriptor targetDescriptor = TableDescriptor.forConnector("jdbc") .schema(new Schema() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT())) .option("connector", "jdbc") .option("url", "jdbc:mysql://localhost:3306/test") .option("table-name", "target_table") .option("username", "root") .option("password", "123456") .build(); // 注册目标表 tableEnv.connect(targetDescriptor).withSchema().inAppendMode().registerTableSource("target_table"); // 同步数据 Table result = tableEnv.sqlQuery("SELECT * FROM source_table"); TableResult tableResult = tableEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table"); } }
将scan.startup.mode
属性设置为only_snapshot
后,Flink CDC将只获取源表的初始快照,而不会在后续时刻获取最新的快照。这样,当源表发生变更时,目标表也会相应地更新。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/609008
问题五:这个时候,作业2 如果点击 无状态 启动,会全量同步mysql数据过来吗?
这个时候,作业2 如果点击 无状态 启动,会全量同步mysql数据过来吗?
参考回答:
如果作业2无状态启动,Flink不会自动进行全量同步。
Flink CDC通常提供了参数来控制数据同步的行为。例如,通过设置scan.startup.mode
为"initial"
,可以指示Flink在作业启动时执行一次全量数据同步。然而,如果这个参数没有被正确设置,或者没有采取其他必要的配置措施,Flink作业在无状态启动时可能只会从K费新增的数据,而不是执行全量同步。
在你的场景中,作业1已经使用CDAS将MySQL的数据
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/608126